http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyServerConfig.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyServerConfig.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyServerConfig.java new file mode 100644 index 0000000..7922206 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyServerConfig.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.netty; + +/** + + * + * @author shijia.wxr + * + */ +public class NettyServerConfig implements Cloneable { + private int listenPort = 8888; + private int serverWorkerThreads = 8; + private int serverCallbackExecutorThreads = 0; + private int serverSelectorThreads = 3; + private int serverOnewaySemaphoreValue = 256; + private int serverAsyncSemaphoreValue = 64; + private int serverChannelMaxIdleTimeSeconds = 120; + + private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize; + private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; + private boolean serverPooledByteBufAllocatorEnable = true; + + /** + * make make install + * + * + * ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \ + * --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd + */ + private boolean useEpollNativeSelector = false; + + + public int getListenPort() { + return listenPort; + } + + + public void setListenPort(int listenPort) { + this.listenPort = listenPort; + } + + + public int getServerWorkerThreads() { + return serverWorkerThreads; + } + + + public void setServerWorkerThreads(int serverWorkerThreads) { + this.serverWorkerThreads = serverWorkerThreads; + } + + + public int getServerSelectorThreads() { + return serverSelectorThreads; + } + + + public void setServerSelectorThreads(int serverSelectorThreads) { + this.serverSelectorThreads = serverSelectorThreads; + } + + + public int getServerOnewaySemaphoreValue() { + return serverOnewaySemaphoreValue; + } + + + public void setServerOnewaySemaphoreValue(int serverOnewaySemaphoreValue) { + this.serverOnewaySemaphoreValue = serverOnewaySemaphoreValue; + } + + + public int getServerCallbackExecutorThreads() { + return serverCallbackExecutorThreads; + } + + + public void setServerCallbackExecutorThreads(int serverCallbackExecutorThreads) { + this.serverCallbackExecutorThreads = serverCallbackExecutorThreads; + } + + + public int getServerAsyncSemaphoreValue() { + return serverAsyncSemaphoreValue; + } + + + public void setServerAsyncSemaphoreValue(int serverAsyncSemaphoreValue) { + this.serverAsyncSemaphoreValue = serverAsyncSemaphoreValue; + } + + + public int getServerChannelMaxIdleTimeSeconds() { + return serverChannelMaxIdleTimeSeconds; + } + + + public void setServerChannelMaxIdleTimeSeconds(int serverChannelMaxIdleTimeSeconds) { + this.serverChannelMaxIdleTimeSeconds = serverChannelMaxIdleTimeSeconds; + } + + + public int getServerSocketSndBufSize() { + return serverSocketSndBufSize; + } + + + public void setServerSocketSndBufSize(int serverSocketSndBufSize) { + this.serverSocketSndBufSize = serverSocketSndBufSize; + } + + + public int getServerSocketRcvBufSize() { + return serverSocketRcvBufSize; + } + + + public void setServerSocketRcvBufSize(int serverSocketRcvBufSize) { + this.serverSocketRcvBufSize = serverSocketRcvBufSize; + } + + + public boolean isServerPooledByteBufAllocatorEnable() { + return serverPooledByteBufAllocatorEnable; + } + + + public void setServerPooledByteBufAllocatorEnable(boolean serverPooledByteBufAllocatorEnable) { + this.serverPooledByteBufAllocatorEnable = serverPooledByteBufAllocatorEnable; + } + + + public boolean isUseEpollNativeSelector() { + return useEpollNativeSelector; + } + + + public void setUseEpollNativeSelector(boolean useEpollNativeSelector) { + this.useEpollNativeSelector = useEpollNativeSelector; + } + + @Override + public Object clone() throws CloneNotSupportedException { + return (NettyServerConfig) super.clone(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettySystemConfig.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettySystemConfig.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettySystemConfig.java new file mode 100644 index 0000000..41589ce --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettySystemConfig.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.remoting.netty; + +public class NettySystemConfig { + public static final String COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = + "com.rocketmq.remoting.nettyPooledByteBufAllocatorEnable"; + public static final String COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE = // + "com.rocketmq.remoting.socket.sndbuf.size"; + public static final String COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE = // + "com.rocketmq.remoting.socket.rcvbuf.size"; + public static final String COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE = // + "com.rocketmq.remoting.clientAsyncSemaphoreValue"; + public static final String COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE = // + "com.rocketmq.remoting.clientOnewaySemaphoreValue"; + public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = // + Boolean + .parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false")); + public static int socketSndbufSize = // + Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "65535")); + public static int socketRcvbufSize = // + Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "65535")); + public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = // + Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535")); + public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE = // + Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE, "65535")); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/RequestTask.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/RequestTask.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/RequestTask.java new file mode 100644 index 0000000..e02ae48 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/RequestTask.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.remoting.netty; + + +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import io.netty.channel.Channel; + +public class RequestTask implements Runnable { + private final Runnable runnable; + private final long createTimestamp = System.currentTimeMillis(); + private final Channel channel; + private final RemotingCommand request; + private boolean stopRun = false; + + public RequestTask(final Runnable runnable, final Channel channel, final RemotingCommand request) { + this.runnable = runnable; + this.channel = channel; + this.request = request; + } + + @Override + public int hashCode() { + int result = runnable != null ? runnable.hashCode() : 0; + result = 31 * result + (int) (getCreateTimestamp() ^ (getCreateTimestamp() >>> 32)); + result = 31 * result + (channel != null ? channel.hashCode() : 0); + result = 31 * result + (request != null ? request.hashCode() : 0); + result = 31 * result + (isStopRun() ? 1 : 0); + return result; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (!(o instanceof RequestTask)) return false; + + final RequestTask that = (RequestTask) o; + + if (getCreateTimestamp() != that.getCreateTimestamp()) return false; + if (isStopRun() != that.isStopRun()) return false; + if (channel != null ? !channel.equals(that.channel) : that.channel != null) return false; + return request != null ? request.getOpaque() == that.request.getOpaque() : that.request == null; + + } + + public long getCreateTimestamp() { + return createTimestamp; + } + + public boolean isStopRun() { + return stopRun; + } + + public void setStopRun(final boolean stopRun) { + this.stopRun = stopRun; + } + + @Override + public void run() { + if (!this.stopRun) this.runnable.run(); + } + + public void returnResponse(int code, String remark) { + final RemotingCommand response = RemotingCommand.createResponseCommand(code, remark); + response.setOpaque(request.getOpaque()); + this.channel.writeAndFlush(response); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/ResponseFuture.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/ResponseFuture.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/ResponseFuture.java new file mode 100644 index 0000000..b6185f1 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/ResponseFuture.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.netty; + +import com.alibaba.rocketmq.remoting.InvokeCallback; +import com.alibaba.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + + +/** + * @author shijia.wxr + */ +public class ResponseFuture { + private final int opaque; + private final long timeoutMillis; + private final InvokeCallback invokeCallback; + private final long beginTimestamp = System.currentTimeMillis(); + private final CountDownLatch countDownLatch = new CountDownLatch(1); + + private final SemaphoreReleaseOnlyOnce once; + + private final AtomicBoolean executeCallbackOnlyOnce = new AtomicBoolean(false); + private volatile RemotingCommand responseCommand; + private volatile boolean sendRequestOK = true; + private volatile Throwable cause; + + + public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback, + SemaphoreReleaseOnlyOnce once) { + this.opaque = opaque; + this.timeoutMillis = timeoutMillis; + this.invokeCallback = invokeCallback; + this.once = once; + } + + + public void executeInvokeCallback() { + if (invokeCallback != null) { + if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) { + invokeCallback.operationComplete(this); + } + } + } + + + public void release() { + if (this.once != null) { + this.once.release(); + } + } + + + public boolean isTimeout() { + long diff = System.currentTimeMillis() - this.beginTimestamp; + return diff > this.timeoutMillis; + } + + + public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException { + this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS); + return this.responseCommand; + } + + + public void putResponse(final RemotingCommand responseCommand) { + this.responseCommand = responseCommand; + this.countDownLatch.countDown(); + } + + + public long getBeginTimestamp() { + return beginTimestamp; + } + + + public boolean isSendRequestOK() { + return sendRequestOK; + } + + + public void setSendRequestOK(boolean sendRequestOK) { + this.sendRequestOK = sendRequestOK; + } + + + public long getTimeoutMillis() { + return timeoutMillis; + } + + + public InvokeCallback getInvokeCallback() { + return invokeCallback; + } + + + public Throwable getCause() { + return cause; + } + + + public void setCause(Throwable cause) { + this.cause = cause; + } + + + public RemotingCommand getResponseCommand() { + return responseCommand; + } + + + public void setResponseCommand(RemotingCommand responseCommand) { + this.responseCommand = responseCommand; + } + + + public int getOpaque() { + return opaque; + } + + + @Override + public String toString() { + return "ResponseFuture [responseCommand=" + responseCommand + ", sendRequestOK=" + sendRequestOK + + ", cause=" + cause + ", opaque=" + opaque + ", timeoutMillis=" + timeoutMillis + + ", invokeCallback=" + invokeCallback + ", beginTimestamp=" + beginTimestamp + + ", countDownLatch=" + countDownLatch + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/LanguageCode.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/LanguageCode.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/LanguageCode.java new file mode 100644 index 0000000..9f4adbe --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/LanguageCode.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.remoting.protocol; + +public enum LanguageCode { + JAVA((byte) 0), + CPP((byte) 1), + DOTNET((byte) 2), + PYTHON((byte) 3), + DELPHI((byte) 4), + ERLANG((byte) 5), + RUBY((byte) 6), + OTHER((byte) 7), + HTTP((byte) 8); + + private byte code; + + LanguageCode(byte code) { + this.code = code; + } + + public static LanguageCode valueOf(byte code) { + for (LanguageCode languageCode : LanguageCode.values()) { + if (languageCode.getCode() == code) { + return languageCode; + } + } + return null; + } + + public byte getCode() { + return code; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommand.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommand.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommand.java new file mode 100644 index 0000000..a09dd3b --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommand.java @@ -0,0 +1,569 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.protocol; + +import com.alibaba.fastjson.annotation.JSONField; +import com.alibaba.rocketmq.remoting.CommandCustomHeader; +import com.alibaba.rocketmq.remoting.annotation.CFNotNull; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.annotation.Annotation; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + + +/** + * @author shijia.wxr + */ +public class RemotingCommand { + public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type"; + public static final String SERIALIZE_TYPE_ENV = "ROCKETMQ_SERIALIZE_TYPE"; + private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); + private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND + private static final int RPC_ONEWAY = 1; // 0, RPC + + private static final Map<Class<? extends CommandCustomHeader>, Field[]> CLASS_HASH_MAP = + new HashMap<Class<? extends CommandCustomHeader>, Field[]>(); + private static final Map<Class, String> CANONICAL_NAME_CACHE = new HashMap<Class, String>(); + // 1, RESPONSE_COMMAND + private static final Map<Field, Annotation> NOT_NULL_ANNOTATION_CACHE = new HashMap<Field, Annotation>(); + // 1, Oneway + + private static final String STRING_CANONICAL_NAME = String.class.getCanonicalName(); + private static final String DOUBLE_CANONICAL_NAME_1 = Double.class.getCanonicalName(); + private static final String DOUBLE_CANONICAL_NAME_2 = double.class.getCanonicalName(); + private static final String INTEGER_CANONICAL_NAME_1 = Integer.class.getCanonicalName(); + private static final String INTEGER_CANONICAL_NAME_2 = int.class.getCanonicalName(); + private static final String LONG_CANONICAL_NAME_1 = Long.class.getCanonicalName(); + private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName(); + private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName(); + private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName(); + public static final String REMOTING_VERSION_KEY = "rocketmq.remoting.version"; + private static volatile int configVersion = -1; + private static AtomicInteger requestId = new AtomicInteger(0); + + private static SerializeType serializeTypeConfigInThisServer = SerializeType.JSON; + + static { + final String protocol = System.getProperty(SERIALIZE_TYPE_PROPERTY, System.getenv(SERIALIZE_TYPE_ENV)); + if (!isBlank(protocol)) { + try { + serializeTypeConfigInThisServer = SerializeType.valueOf(protocol); + } catch (IllegalArgumentException e) { + throw new RuntimeException("parser specified protocol error. protocol=" + protocol, e); + } + } + } + + /** + + */ + private int code; + private LanguageCode language = LanguageCode.JAVA; + private int version = 0; + private int opaque = requestId.getAndIncrement(); + private int flag = 0; + private String remark; + private HashMap<String, String> extFields; + private transient CommandCustomHeader customHeader; + /** + + */ + private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer; + /** + + */ + private transient byte[] body; + + + protected RemotingCommand() { + } + + public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) { + RemotingCommand cmd = new RemotingCommand(); + cmd.setCode(code); + cmd.customHeader = customHeader; + setCmdVersion(cmd); + return cmd; + } + + private static void setCmdVersion(RemotingCommand cmd) { + if (configVersion >= 0) { + cmd.setVersion(configVersion); + } else { + String v = System.getProperty(REMOTING_VERSION_KEY); + if (v != null) { + int value = Integer.parseInt(v); + cmd.setVersion(value); + configVersion = value; + } + } + } + + public static RemotingCommand createResponseCommand(Class<? extends CommandCustomHeader> classHeader) { + RemotingCommand cmd = createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, "not set any response code", classHeader); + + return cmd; + } + + /** + + */ + public static RemotingCommand createResponseCommand(int code, String remark, Class<? extends CommandCustomHeader> classHeader) { + RemotingCommand cmd = new RemotingCommand(); + cmd.markResponseType(); + cmd.setCode(code); + cmd.setRemark(remark); + setCmdVersion(cmd); + + if (classHeader != null) { + try { + CommandCustomHeader objectHeader = classHeader.newInstance(); + cmd.customHeader = objectHeader; + } catch (InstantiationException e) { + return null; + } catch (IllegalAccessException e) { + return null; + } + } + + return cmd; + } + + public void markResponseType() { + int bits = 1 << RPC_TYPE; + this.flag |= bits; + } + + public static RemotingCommand createResponseCommand(int code, String remark) { + return createResponseCommand(code, remark, null); + } + + public static RemotingCommand decode(final byte[] array) { + ByteBuffer byteBuffer = ByteBuffer.wrap(array); + return decode(byteBuffer); + } + + public static RemotingCommand decode(final ByteBuffer byteBuffer) { + int length = byteBuffer.limit(); + int oriHeaderLen = byteBuffer.getInt(); + int headerLength = getHeaderLength(oriHeaderLen); + + byte[] headerData = new byte[headerLength]; + byteBuffer.get(headerData); + + RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); + + int bodyLength = length - 4 - headerLength; + byte[] bodyData = null; + if (bodyLength > 0) { + bodyData = new byte[bodyLength]; + byteBuffer.get(bodyData); + } + cmd.body = bodyData; + + return cmd; + } + + public static int getHeaderLength(int length) { + return length & 0xFFFFFF; + } + + private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) { + switch (type) { + case JSON: + RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class); + resultJson.setSerializeTypeCurrentRPC(type); + return resultJson; + case ROCKETMQ: + RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData); + resultRMQ.setSerializeTypeCurrentRPC(type); + return resultRMQ; + default: + break; + } + + return null; + } + + public static SerializeType getProtocolType(int source) { + return SerializeType.valueOf((byte) ((source >> 24) & 0xFF)); + } + + public static int createNewRequestId() { + return requestId.incrementAndGet(); + } + + public static SerializeType getSerializeTypeConfigInThisServer() { + return serializeTypeConfigInThisServer; + } + + private static boolean isBlank(String str) { + int strLen; + if (str == null || (strLen = str.length()) == 0) { + return true; + } + for (int i = 0; i < strLen; i++) { + if (!Character.isWhitespace(str.charAt(i))) { + return false; + } + } + return true; + } + + public CommandCustomHeader readCustomHeader() { + return customHeader; + } + + public void writeCustomHeader(CommandCustomHeader customHeader) { + this.customHeader = customHeader; + } + + public CommandCustomHeader decodeCommandCustomHeader(Class<? extends CommandCustomHeader> classHeader) throws RemotingCommandException { + CommandCustomHeader objectHeader; + try { + objectHeader = classHeader.newInstance(); + } catch (InstantiationException e) { + return null; + } catch (IllegalAccessException e) { + return null; + } + + if (this.extFields != null) { + + Field[] fields = getClazzFields(classHeader); + for (Field field : fields) { + if (!Modifier.isStatic(field.getModifiers())) { + String fieldName = field.getName(); + if (!fieldName.startsWith("this")) { + try { + String value = this.extFields.get(fieldName); + if (null == value) { + Annotation annotation = getNotNullAnnotation(field); + if (annotation != null) { + throw new RemotingCommandException("the custom field <" + fieldName + "> is null"); + } + + continue; + } + + field.setAccessible(true); + String type = getCanonicalName(field.getType()); + Object valueParsed; + + if (type.equals(STRING_CANONICAL_NAME)) { + valueParsed = value; + } else if (type.equals(INTEGER_CANONICAL_NAME_1) || type.equals(INTEGER_CANONICAL_NAME_2)) { + valueParsed = Integer.parseInt(value); + } else if (type.equals(LONG_CANONICAL_NAME_1) || type.equals(LONG_CANONICAL_NAME_2)) { + valueParsed = Long.parseLong(value); + } else if (type.equals(BOOLEAN_CANONICAL_NAME_1) || type.equals(BOOLEAN_CANONICAL_NAME_2)) { + valueParsed = Boolean.parseBoolean(value); + } else if (type.equals(DOUBLE_CANONICAL_NAME_1) || type.equals(DOUBLE_CANONICAL_NAME_2)) { + valueParsed = Double.parseDouble(value); + } else { + throw new RemotingCommandException("the custom field <" + fieldName + "> type is not supported"); + } + + field.set(objectHeader, valueParsed); + + } catch (Throwable e) { + } + } + } + } + + objectHeader.checkFields(); + } + + return objectHeader; + } + + private Field[] getClazzFields(Class<? extends CommandCustomHeader> classHeader) { + Field[] field = CLASS_HASH_MAP.get(classHeader); + + if (field == null) { + field = classHeader.getDeclaredFields(); + synchronized (CLASS_HASH_MAP) { + CLASS_HASH_MAP.put(classHeader, field); + } + } + return field; + } + + private Annotation getNotNullAnnotation(Field field) { + Annotation annotation = NOT_NULL_ANNOTATION_CACHE.get(field); + + if (annotation == null) { + annotation = field.getAnnotation(CFNotNull.class); + synchronized (NOT_NULL_ANNOTATION_CACHE) { + NOT_NULL_ANNOTATION_CACHE.put(field, annotation); + } + } + return annotation; + } + + private String getCanonicalName(Class clazz) { + String name = CANONICAL_NAME_CACHE.get(clazz); + + if (name == null) { + name = clazz.getCanonicalName(); + synchronized (CANONICAL_NAME_CACHE) { + CANONICAL_NAME_CACHE.put(clazz, name); + } + } + return name; + } + + public ByteBuffer encode() { + // 1> header length size + int length = 4; + + // 2> header data length + byte[] headerData = this.headerEncode(); + length += headerData.length; + + // 3> body data length + if (this.body != null) { + length += body.length; + } + + ByteBuffer result = ByteBuffer.allocate(4 + length); + + // length + result.putInt(length); + + // header length + result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); + + // header data + result.put(headerData); + + // body data; + if (this.body != null) { + result.put(this.body); + } + + result.flip(); + + return result; + } + + private byte[] headerEncode() { + this.makeCustomHeaderToNet(); + if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) { + return RocketMQSerializable.rocketMQProtocolEncode(this); + } else { + return RemotingSerializable.encode(this); + } + } + + public static byte[] markProtocolType(int source, SerializeType type) { + byte[] result = new byte[4]; + + result[0] = type.getCode(); + result[1] = (byte) ((source >> 16) & 0xFF); + result[2] = (byte) ((source >> 8) & 0xFF); + result[3] = (byte) (source & 0xFF); + return result; + } + + public void makeCustomHeaderToNet() { + if (this.customHeader != null) { + Field[] fields = getClazzFields(customHeader.getClass()); + if (null == this.extFields) { + this.extFields = new HashMap<String, String>(); + } + + for (Field field : fields) { + if (!Modifier.isStatic(field.getModifiers())) { + String name = field.getName(); + if (!name.startsWith("this")) { + Object value = null; + try { + field.setAccessible(true); + value = field.get(this.customHeader); + } catch (IllegalArgumentException e) { + } catch (IllegalAccessException e) { + } + + if (value != null) { + this.extFields.put(name, value.toString()); + } + } + } + } + } + } + + public ByteBuffer encodeHeader() { + return encodeHeader(this.body != null ? this.body.length : 0); + } + + /** + + */ + public ByteBuffer encodeHeader(final int bodyLength) { + // 1> header length size + int length = 4; + + // 2> header data length + byte[] headerData; + headerData = this.headerEncode(); + + length += headerData.length; + + // 3> body data length + length += bodyLength; + + ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); + + // length + result.putInt(length); + + // header length + result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); + + // header data + result.put(headerData); + + result.flip(); + + return result; + } + + public void markOnewayRPC() { + int bits = 1 << RPC_ONEWAY; + this.flag |= bits; + } + + @JSONField(serialize = false) + public boolean isOnewayRPC() { + int bits = 1 << RPC_ONEWAY; + return (this.flag & bits) == bits; + } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + + @JSONField(serialize = false) + public RemotingCommandType getType() { + if (this.isResponseType()) { + return RemotingCommandType.RESPONSE_COMMAND; + } + + return RemotingCommandType.REQUEST_COMMAND; + } + + @JSONField(serialize = false) + public boolean isResponseType() { + int bits = 1 << RPC_TYPE; + return (this.flag & bits) == bits; + } + + public LanguageCode getLanguage() { + return language; + } + + public void setLanguage(LanguageCode language) { + this.language = language; + } + + public int getVersion() { + return version; + } + + public void setVersion(int version) { + this.version = version; + } + + public int getOpaque() { + return opaque; + } + + public void setOpaque(int opaque) { + this.opaque = opaque; + } + + public int getFlag() { + return flag; + } + + public void setFlag(int flag) { + this.flag = flag; + } + + public String getRemark() { + return remark; + } + + public void setRemark(String remark) { + this.remark = remark; + } + + public byte[] getBody() { + return body; + } + + public void setBody(byte[] body) { + this.body = body; + } + + public HashMap<String, String> getExtFields() { + return extFields; + } + + public void setExtFields(HashMap<String, String> extFields) { + this.extFields = extFields; + } + + public void addExtField(String key, String value) { + if (null == extFields) { + extFields = new HashMap<String, String>(); + } + extFields.put(key, value); + } + + @Override + public String toString() { + return "RemotingCommand [code=" + code + ", language=" + language + ", version=" + version + ", opaque=" + opaque + ", flag(B)=" + + Integer.toBinaryString(flag) + ", remark=" + remark + ", extFields=" + extFields + ", serializeTypeCurrentRPC=" + + serializeTypeCurrentRPC + "]"; + } + + + public SerializeType getSerializeTypeCurrentRPC() { + return serializeTypeCurrentRPC; + } + + + public void setSerializeTypeCurrentRPC(SerializeType serializeTypeCurrentRPC) { + this.serializeTypeCurrentRPC = serializeTypeCurrentRPC; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommandType.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommandType.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommandType.java new file mode 100644 index 0000000..c0ac288 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommandType.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.protocol; + +/** + * @author shijia.wxr + * + */ +public enum RemotingCommandType { + REQUEST_COMMAND, + RESPONSE_COMMAND; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingSerializable.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingSerializable.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingSerializable.java new file mode 100644 index 0000000..c144ea6 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingSerializable.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.protocol; + +import com.alibaba.fastjson.JSON; + +import java.nio.charset.Charset; + + +/** + * @author shijia.wxr + * + */ +public abstract class RemotingSerializable { + public final static Charset CHARSET_UTF8 = Charset.forName("UTF-8"); + + public static byte[] encode(final Object obj) { + final String json = toJson(obj, false); + if (json != null) { + return json.getBytes(CHARSET_UTF8); + } + return null; + } + + public static String toJson(final Object obj, boolean prettyFormat) { + return JSON.toJSONString(obj, prettyFormat); + } + + public static <T> T decode(final byte[] data, Class<T> classOfT) { + final String json = new String(data, CHARSET_UTF8); + return fromJson(json, classOfT); + } + + public static <T> T fromJson(String json, Class<T> classOfT) { + return JSON.parseObject(json, classOfT); + } + + public byte[] encode() { + final String json = this.toJson(); + if (json != null) { + return json.getBytes(CHARSET_UTF8); + } + return null; + } + + public String toJson() { + return toJson(false); + } + + public String toJson(final boolean prettyFormat) { + return toJson(this, prettyFormat); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingSysResponseCode.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingSysResponseCode.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingSysResponseCode.java new file mode 100644 index 0000000..085c750 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingSysResponseCode.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.remoting.protocol; + +public class RemotingSysResponseCode { + + public static final int SUCCESS = 0; + + public static final int SYSTEM_ERROR = 1; + + public static final int SYSTEM_BUSY = 2; + + public static final int REQUEST_CODE_NOT_SUPPORTED = 3; + + public static final int TRANSACTION_FAILED = 4; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RocketMQSerializable.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RocketMQSerializable.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RocketMQSerializable.java new file mode 100644 index 0000000..6956aab --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RocketMQSerializable.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.remoting.protocol; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + + +/** + * @author manhong.yqd + * + */ +public class RocketMQSerializable { + public static final Charset CHARSET_UTF8 = Charset.forName("UTF-8"); + + public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) { + // String remark + byte[] remarkBytes = null; + int remarkLen = 0; + if (cmd.getRemark() != null && cmd.getRemark().length() > 0) { + remarkBytes = cmd.getRemark().getBytes(RemotingSerializable.CHARSET_UTF8); + remarkLen = remarkBytes.length; + } + + // HashMap<String, String> extFields + byte[] extFieldsBytes = null; + int extLen = 0; + if (cmd.getExtFields() != null && !cmd.getExtFields().isEmpty()) { + extFieldsBytes = mapSerialize(cmd.getExtFields()); + extLen = extFieldsBytes.length; + } + + // ################### cal total length + int totalLen = calTotalLen(remarkLen, extLen); + + // ################### content + ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen); + // int code(~32767) + headerBuffer.putShort((short) cmd.getCode()); + // LanguageCode language + headerBuffer.put(cmd.getLanguage().getCode()); + // int version(~32767) + headerBuffer.putShort((short) cmd.getVersion()); + // int opaque + headerBuffer.putInt(cmd.getOpaque()); + // int flag + headerBuffer.putInt(cmd.getFlag()); + // String remark + if (remarkBytes != null) { + headerBuffer.putInt(remarkBytes.length); + headerBuffer.put(remarkBytes); + } else { + headerBuffer.putInt(0); + } + // HashMap<String, String> extFields; + if (extFieldsBytes != null) { + headerBuffer.putInt(extFieldsBytes.length); + headerBuffer.put(extFieldsBytes); + } else { + headerBuffer.putInt(0); + } + + return headerBuffer.array(); + } + + public static byte[] mapSerialize(HashMap<String, String> map) { + // keySize+key+valSize+val + // keySize+key+valSize+val + if (null == map || map.isEmpty()) + return null; + + int totalLength = 0; + int kvLength; + Iterator<Map.Entry<String, String>> it = map.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<String, String> entry = it.next(); + if (entry.getKey() != null && entry.getValue() != null) { + kvLength = + // keySize + Key + 2 + entry.getKey().getBytes(RemotingSerializable.CHARSET_UTF8).length + // valSize + val + + 4 + entry.getValue().getBytes(RemotingSerializable.CHARSET_UTF8).length; + totalLength += kvLength; + } + } + + ByteBuffer content = ByteBuffer.allocate(totalLength); + byte[] key; + byte[] val; + it = map.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<String, String> entry = it.next(); + if (entry.getKey() != null && entry.getValue() != null) { + key = entry.getKey().getBytes(RemotingSerializable.CHARSET_UTF8); + val = entry.getValue().getBytes(RemotingSerializable.CHARSET_UTF8); + + content.putShort((short) key.length); + content.put(key); + + content.putInt(val.length); + content.put(val); + } + } + + return content.array(); + } + + private static int calTotalLen(int remark, int ext) { + // int code(~32767) + int length = 2 + // LanguageCode language + + 1 + // int version(~32767) + + 2 + // int opaque + + 4 + // int flag + + 4 + // String remark + + 4 + remark + // HashMap<String, String> extFields + + 4 + ext; + + return length; + } + + public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) { + RemotingCommand cmd = new RemotingCommand(); + ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray); + // int code(~32767) + cmd.setCode(headerBuffer.getShort()); + // LanguageCode language + cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get())); + // int version(~32767) + cmd.setVersion(headerBuffer.getShort()); + // int opaque + cmd.setOpaque(headerBuffer.getInt()); + // int flag + cmd.setFlag(headerBuffer.getInt()); + // String remark + int remarkLength = headerBuffer.getInt(); + if (remarkLength > 0) { + byte[] remarkContent = new byte[remarkLength]; + headerBuffer.get(remarkContent); + cmd.setRemark(new String(remarkContent, RemotingSerializable.CHARSET_UTF8)); + } + + // HashMap<String, String> extFields + int extFieldsLength = headerBuffer.getInt(); + if (extFieldsLength > 0) { + byte[] extFieldsBytes = new byte[extFieldsLength]; + headerBuffer.get(extFieldsBytes); + cmd.setExtFields(mapDeserialize(extFieldsBytes)); + } + return cmd; + } + + public static HashMap<String, String> mapDeserialize(byte[] bytes) { + if (bytes == null || bytes.length <= 0) + return null; + + HashMap<String, String> map = new HashMap<String, String>(); + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + + short keySize = 0; + byte[] keyContent = null; + int valSize = 0; + byte[] valContent = null; + while (byteBuffer.hasRemaining()) { + keySize = byteBuffer.getShort(); + keyContent = new byte[keySize]; + byteBuffer.get(keyContent); + + valSize = byteBuffer.getInt(); + valContent = new byte[valSize]; + byteBuffer.get(valContent); + + map.put(new String(keyContent, RemotingSerializable.CHARSET_UTF8), new String(valContent, + RemotingSerializable.CHARSET_UTF8)); + } + return map; + } + + + public static boolean isBlank(String str) { + int strLen; + if (str == null || (strLen = str.length()) == 0) { + return true; + } + for (int i = 0; i < strLen; i++) { + if (!Character.isWhitespace(str.charAt(i))) { + return false; + } + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/SerializeType.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/SerializeType.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/SerializeType.java new file mode 100644 index 0000000..b46d8d8 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/SerializeType.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.remoting.protocol; + +public enum SerializeType { + JSON((byte) 0), + ROCKETMQ((byte) 1); + + private byte code; + + SerializeType(byte code) { + this.code = code; + } + + public static SerializeType valueOf(byte code) { + for (SerializeType serializeType : SerializeType.values()) { + if (serializeType.getCode() == code) { + return serializeType; + } + } + return null; + } + + public byte getCode() { + return code; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/protocol.sevialize.txt ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/protocol.sevialize.txt b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/protocol.sevialize.txt new file mode 100644 index 0000000..976cdf2 --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/protocol.sevialize.txt @@ -0,0 +1,6 @@ +// +// Remoting protocol V0.1 draft +// +// protocol <length> <header length> <header data> <body data> +// 1 2 3 4 +// \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/protocol.txt ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/protocol.txt b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/protocol.txt new file mode 100644 index 0000000..7444cbb --- /dev/null +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/protocol.txt @@ -0,0 +1,6 @@ +// +// Remoting protocol V0.1 draft +// +// protocol <length> <header length> <header data> <body data> +// 1 2 3 4 +// http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/ExceptionTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/ExceptionTest.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/ExceptionTest.java new file mode 100644 index 0000000..6baa013 --- /dev/null +++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/ExceptionTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: ExceptionTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $ + */ +package com.alibaba.rocketmq.remoting; + +import com.alibaba.rocketmq.remoting.exception.RemotingConnectException; +import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; +import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; +import com.alibaba.rocketmq.remoting.netty.*; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import io.netty.channel.ChannelHandlerContext; +import org.junit.Test; + +import java.util.concurrent.Executors; + +import static org.junit.Assert.assertTrue; + + +/** + * @author shijia.wxr + */ +public class ExceptionTest { + private static RemotingServer createRemotingServer() throws InterruptedException { + NettyServerConfig config = new NettyServerConfig(); + RemotingServer client = new NettyRemotingServer(config); + client.registerProcessor(0, new NettyRequestProcessor() { + private int i = 0; + + + @Override + public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) { + System.out.println("processRequest=" + request + " " + (i++)); + request.setRemark("hello, I am respponse " + ctx.channel().remoteAddress()); + return request; + } + + @Override + public boolean rejectRequest() { + return false; + } + }, Executors.newCachedThreadPool()); + client.start(); + return client; + } + + @Test + public void test_CONNECT_EXCEPTION() { + RemotingClient client = createRemotingClient(); + + RemotingCommand request = RemotingCommand.createRequestCommand(0, null); + RemotingCommand response = null; + try { + response = client.invokeSync("localhost:8888", request, 1000 * 3); + } catch (RemotingConnectException e) { + e.printStackTrace(); + } catch (RemotingSendRequestException e) { + e.printStackTrace(); + } catch (RemotingTimeoutException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println("invoke result = " + response); + assertTrue(null == response); + + client.shutdown(); + System.out.println("-----------------------------------------------------------------"); + } + + private static RemotingClient createRemotingClient() { + NettyClientConfig config = new NettyClientConfig(); + RemotingClient client = new NettyRemotingClient(config); + client.start(); + return client; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/MixTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/MixTest.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/MixTest.java new file mode 100644 index 0000000..97d1663 --- /dev/null +++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/MixTest.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: MixTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $ + */ +package com.alibaba.rocketmq.remoting; + +import org.junit.Test; + + +/** + * @author shijia.wxr + */ +public class MixTest { + @Test + public void test_extFieldsValue() { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java new file mode 100644 index 0000000..e4ff948 --- /dev/null +++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.remoting; + +import com.alibaba.rocketmq.remoting.exception.RemotingConnectException; +import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; +import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; +import com.alibaba.rocketmq.remoting.netty.NettyClientConfig; +import com.alibaba.rocketmq.remoting.netty.NettyRemotingClient; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import org.junit.Test; + + +/** + + * + * @author shijia.wxr + * + */ +public class NettyConnectionTest { + @Test + public void test_connect_timeout() throws InterruptedException, RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException { + RemotingClient client = createRemotingClient(); + + for (int i = 0; i < 100; i++) { + try { + RemotingCommand request = RemotingCommand.createRequestCommand(0, null); + RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3); + } catch (Exception e) { + e.printStackTrace(); + } + } + + client.shutdown(); + System.out.println("-----------------------------------------------------------------"); + } + + public static RemotingClient createRemotingClient() { + NettyClientConfig config = new NettyClientConfig(); + config.setClientChannelMaxIdleTimeSeconds(15); + RemotingClient client = new NettyRemotingClient(config); + client.start(); + return client; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyIdleTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyIdleTest.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyIdleTest.java new file mode 100644 index 0000000..23145f4 --- /dev/null +++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyIdleTest.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.remoting; + +import com.alibaba.rocketmq.remoting.exception.RemotingConnectException; +import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; +import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; +import com.alibaba.rocketmq.remoting.netty.*; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import io.netty.channel.ChannelHandlerContext; + +import java.util.concurrent.Executors; + +import static org.junit.Assert.assertTrue; + + +/** + * @author shijia.wxr + * + */ +public class NettyIdleTest { + // @Test + public void test_idle_event() throws InterruptedException, RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException { + RemotingServer server = createRemotingServer(); + RemotingClient client = createRemotingClient(); + + for (int i = 0; i < 10; i++) { + RemotingCommand request = RemotingCommand.createRequestCommand(0, null); + RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3); + System.out.println(i + " invoke result = " + response); + assertTrue(response != null); + + Thread.sleep(1000 * 10); + } + + Thread.sleep(1000 * 60); + + client.shutdown(); + server.shutdown(); + System.out.println("-----------------------------------------------------------------"); + } + + public static RemotingServer createRemotingServer() throws InterruptedException { + NettyServerConfig config = new NettyServerConfig(); + config.setServerChannelMaxIdleTimeSeconds(30); + RemotingServer remotingServer = new NettyRemotingServer(config); + remotingServer.registerProcessor(0, new NettyRequestProcessor() { + private int i = 0; + + + @Override + public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) { + System.out.println("processRequest=" + request + " " + (i++)); + request.setRemark("hello, I am respponse " + ctx.channel().remoteAddress()); + return request; + } + + @Override + public boolean rejectRequest() { + return false; + } + }, Executors.newCachedThreadPool()); + remotingServer.start(); + return remotingServer; + } + + public static RemotingClient createRemotingClient() { + NettyClientConfig config = new NettyClientConfig(); + config.setClientChannelMaxIdleTimeSeconds(15); + RemotingClient client = new NettyRemotingClient(config); + client.start(); + return client; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyRPCTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyRPCTest.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyRPCTest.java new file mode 100644 index 0000000..7d4f68e --- /dev/null +++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyRPCTest.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: NettyRPCTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $ + */ +package com.alibaba.rocketmq.remoting; + +import com.alibaba.rocketmq.remoting.annotation.CFNullable; +import com.alibaba.rocketmq.remoting.exception.*; +import com.alibaba.rocketmq.remoting.netty.*; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import io.netty.channel.ChannelHandlerContext; +import org.junit.Test; + +import java.util.concurrent.Executors; + +import static org.junit.Assert.assertTrue; + + +/** + * @author shijia.wxr + */ +public class NettyRPCTest { + @Test + public void test_RPC_Sync() throws InterruptedException, RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException { + RemotingServer server = createRemotingServer(); + RemotingClient client = createRemotingClient(); + + for (int i = 0; i < 100; i++) { + TestRequestHeader requestHeader = new TestRequestHeader(); + requestHeader.setCount(i); + requestHeader.setMessageTitle("HelloMessageTitle"); + RemotingCommand request = RemotingCommand.createRequestCommand(0, requestHeader); + RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3000); + System.out.println("invoke result = " + response); + assertTrue(response != null); + } + + client.shutdown(); + server.shutdown(); + System.out.println("-----------------------------------------------------------------"); + } + + public static RemotingServer createRemotingServer() throws InterruptedException { + NettyServerConfig config = new NettyServerConfig(); + RemotingServer remotingServer = new NettyRemotingServer(config); + remotingServer.registerProcessor(0, new NettyRequestProcessor() { + private int i = 0; + + + @Override + public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) { + System.out.println("processRequest=" + request + " " + (i++)); + request.setRemark("hello, I am respponse " + ctx.channel().remoteAddress()); + return request; + } + + @Override + public boolean rejectRequest() { + return false; + } + }, Executors.newCachedThreadPool()); + remotingServer.start(); + return remotingServer; + } + + public static RemotingClient createRemotingClient() { + NettyClientConfig config = new NettyClientConfig(); + RemotingClient client = new NettyRemotingClient(config); + client.start(); + return client; + } + + @Test + public void test_RPC_Oneway() throws InterruptedException, RemotingConnectException, + RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException { + RemotingServer server = createRemotingServer(); + RemotingClient client = createRemotingClient(); + + for (int i = 0; i < 100; i++) { + RemotingCommand request = RemotingCommand.createRequestCommand(0, null); + request.setRemark(String.valueOf(i)); + client.invokeOneway("localhost:8888", request, 1000 * 3); + } + + client.shutdown(); + server.shutdown(); + System.out.println("-----------------------------------------------------------------"); + } + + + @Test + public void test_RPC_Async() throws InterruptedException, RemotingConnectException, + RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException { + RemotingServer server = createRemotingServer(); + RemotingClient client = createRemotingClient(); + + for (int i = 0; i < 100; i++) { + RemotingCommand request = RemotingCommand.createRequestCommand(0, null); + request.setRemark(String.valueOf(i)); + client.invokeAsync("localhost:8888", request, 1000 * 3, new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + System.out.println(responseFuture.getResponseCommand()); + } + }); + } + + Thread.sleep(1000 * 3); + + client.shutdown(); + server.shutdown(); + System.out.println("-----------------------------------------------------------------"); + } + + + @Test + public void test_server_call_client() throws InterruptedException, RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException { + final RemotingServer server = createRemotingServer(); + final RemotingClient client = createRemotingClient(); + + server.registerProcessor(0, new NettyRequestProcessor() { + @Override + public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) { + try { + return server.invokeSync(ctx.channel(), request, 1000 * 10); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (RemotingSendRequestException e) { + e.printStackTrace(); + } catch (RemotingTimeoutException e) { + e.printStackTrace(); + } + + return null; + } + + @Override + public boolean rejectRequest() { + return false; + } + }, Executors.newCachedThreadPool()); + + client.registerProcessor(0, new NettyRequestProcessor() { + @Override + public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) { + System.out.println("client receive server request = " + request); + request.setRemark("client remark"); + return request; + } + + @Override + public boolean rejectRequest() { + return false; + } + }, Executors.newCachedThreadPool()); + + for (int i = 0; i < 3; i++) { + RemotingCommand request = RemotingCommand.createRequestCommand(0, null); + RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3); + System.out.println("invoke result = " + response); + assertTrue(response != null); + } + + client.shutdown(); + server.shutdown(); + System.out.println("-----------------------------------------------------------------"); + } + +} + + +class TestRequestHeader implements CommandCustomHeader { + @CFNullable + private Integer count; + + @CFNullable + private String messageTitle; + + + @Override + public void checkFields() throws RemotingCommandException { + } + + + public Integer getCount() { + return count; + } + + + public void setCount(Integer count) { + this.count = count; + } + + + public String getMessageTitle() { + return messageTitle; + } + + + public void setMessageTitle(String messageTitle) { + this.messageTitle = messageTitle; + } +} + + +class TestResponseHeader implements CommandCustomHeader { + @CFNullable + private Integer count; + + @CFNullable + private String messageTitle; + + public Integer getCount() { + return count; + } + + public void setCount(Integer count) { + this.count = count; + } + + @Override + public void checkFields() throws RemotingCommandException { + + } + + public String getMessageTitle() { + return messageTitle; + } + + public void setMessageTitle(String messageTitle) { + this.messageTitle = messageTitle; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/SyncInvokeTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/SyncInvokeTest.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/SyncInvokeTest.java new file mode 100644 index 0000000..fc3e708 --- /dev/null +++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/SyncInvokeTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: SyncInvokeTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $ + */ +package com.alibaba.rocketmq.remoting; + +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + + +/** + * @author shijia.wxr + */ +public class SyncInvokeTest { + @Test + public void test_RPC_Sync() throws Exception { + RemotingServer server = NettyRPCTest.createRemotingServer(); + RemotingClient client = NettyRPCTest.createRemotingClient(); + + for (int i = 0; i < 100; i++) { + try { + RemotingCommand request = RemotingCommand.createRequestCommand(0, null); + RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3); + System.out.println(i + "\t" + "invoke result = " + response); + assertTrue(response != null); + } catch (Exception e) { + e.printStackTrace(); + throw e; + } + } + + client.shutdown(); + server.shutdown(); + System.out.println("-----------------------------------------------------------------"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/subclass/TestSubClassAuto.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/subclass/TestSubClassAuto.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/subclass/TestSubClassAuto.java new file mode 100644 index 0000000..3d18d23 --- /dev/null +++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/subclass/TestSubClassAuto.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * + */ +package com.alibaba.rocketmq.subclass; + +import org.junit.Test; + + +/** + * @author shijia.wxr + */ +public class TestSubClassAuto { + @Test + public void test_sub() { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-srvutil/pom.xml ---------------------------------------------------------------------- diff --git a/rocketmq-srvutil/pom.xml b/rocketmq-srvutil/pom.xml new file mode 100644 index 0000000..98b4e29 --- /dev/null +++ b/rocketmq-srvutil/pom.xml @@ -0,0 +1,51 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain producerGroup copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>com.alibaba.rocketmq</groupId> + <artifactId>rocketmq-all</artifactId> + <version>4.0.0-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <packaging>jar</packaging> + <artifactId>rocketmq-srvutil</artifactId> + <name>rocketmq-srvutil ${project.version}</name> + + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>rocketmq-remoting</artifactId> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>rocketmq-common</artifactId> + </dependency> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-srvutil/src/main/java/com/alibaba/rocketmq/srvutil/ServerUtil.java ---------------------------------------------------------------------- diff --git a/rocketmq-srvutil/src/main/java/com/alibaba/rocketmq/srvutil/ServerUtil.java b/rocketmq-srvutil/src/main/java/com/alibaba/rocketmq/srvutil/ServerUtil.java new file mode 100644 index 0000000..72fa9a4 --- /dev/null +++ b/rocketmq-srvutil/src/main/java/com/alibaba/rocketmq/srvutil/ServerUtil.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.srvutil; + +import org.apache.commons.cli.*; + +import java.util.Properties; + +public class ServerUtil { + + public static Options buildCommandlineOptions(final Options options) { + Option opt = new Option("h", "help", false, "Print help"); + opt.setRequired(false); + options.addOption(opt); + + opt = + new Option("n", "namesrvAddr", true, + "Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + + public static CommandLine parseCmdLine(final String appName, String[] args, Options options, + CommandLineParser parser) { + HelpFormatter hf = new HelpFormatter(); + hf.setWidth(110); + CommandLine commandLine = null; + try { + commandLine = parser.parse(options, args); + if (commandLine.hasOption('h')) { + hf.printHelp(appName, options, true); + return null; + } + } catch (ParseException e) { + hf.printHelp(appName, options, true); + } + + return commandLine; + } + + + public static void printCommandLineHelp(final String appName, final Options options) { + HelpFormatter hf = new HelpFormatter(); + hf.setWidth(110); + hf.printHelp(appName, options, true); + } + + + public static Properties commandLine2Properties(final CommandLine commandLine) { + Properties properties = new Properties(); + Option[] opts = commandLine.getOptions(); + + if (opts != null) { + for (Option opt : opts) { + String name = opt.getLongOpt(); + String value = commandLine.getOptionValue(name); + if (value != null) { + properties.setProperty(name, value); + } + } + } + + return properties; + } + +}
