This is an automated email from the ASF dual-hosted git repository. yukon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-remoting.git
commit bc9df7119a36324a8ab3153b96f7945dc16cc10c Author: yukon <[email protected]> AuthorDate: Thu May 16 13:25:31 2019 +0800 Polish the protocol/encoder/decoder etc. --- .../rocketmq/remoting/api/RemotingMarshaller.java | 24 --- .../rocketmq/remoting/api/RemotingService.java | 2 +- .../remoting/api/command/RemotingCommand.java | 52 +---- .../{compressable => compression}/Compressor.java | 2 +- .../CompressorFactory.java | 2 +- .../remoting/api/serializable/Serializer.java | 36 ---- remoting-core/remoting-impl/pom.xml | 19 +- .../rocketmq/remoting/config/RemotingConfig.java | 12 +- .../remoting/impl/command/CodecHelper.java | 221 ++++++++++----------- .../impl/command/RemotingCommandFactoryImpl.java | 22 +- .../remoting/impl/command/RemotingCommandImpl.java | 104 ++-------- .../impl/command/RemotingSysResponseCode.java} | 15 +- .../compression/CompressorFactoryImpl.java | 6 +- .../{protocol => }/compression/GZipCompressor.java | 4 +- .../remoting/impl/netty/NettyRemotingAbstract.java | 28 +-- .../remoting/impl/netty/handler/Decoder.java | 23 +-- .../remoting/impl/netty/handler/Encoder.java | 33 +-- .../impl/protocol/serializer/JsonSerializer.java | 88 -------- .../impl/protocol/serializer/Kryo3Serializer.java | 90 --------- .../protocol/serializer/MsgPackSerializer.java | 78 -------- .../protocol/serializer/SerializerFactoryImpl.java | 69 ------- .../impl/protocol/serializer/ThreadSafeKryo.java | 99 --------- 22 files changed, 185 insertions(+), 844 deletions(-) diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingMarshaller.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingMarshaller.java deleted file mode 100644 index 62c9dda..0000000 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingMarshaller.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.api; - -import org.apache.rocketmq.remoting.api.serializable.SerializerFactory; - -public interface RemotingMarshaller { - SerializerFactory serializerFactory(); -} diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingService.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingService.java index 2f6343c..8802148 100644 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingService.java +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingService.java @@ -22,7 +22,7 @@ import org.apache.rocketmq.remoting.api.command.RemotingCommandFactory; import org.apache.rocketmq.remoting.api.interceptor.Interceptor; import org.apache.rocketmq.remoting.common.Pair; -public interface RemotingService extends RemotingMarshaller, ConnectionService, ObjectLifecycle { +public interface RemotingService extends ConnectionService, ObjectLifecycle { void registerInterceptor(Interceptor interceptor); void registerRequestProcessor(final String requestCode, final RequestProcessor processor, diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/command/RemotingCommand.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/command/RemotingCommand.java index f21a45d..8496b04 100644 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/command/RemotingCommand.java +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/command/RemotingCommand.java @@ -17,31 +17,28 @@ package org.apache.rocketmq.remoting.api.command; -import java.lang.reflect.Type; import java.util.Map; -import org.apache.rocketmq.remoting.api.serializable.SerializerFactory; -import org.apache.rocketmq.remoting.common.TypePresentation; public interface RemotingCommand { - byte protocolType(); + short cmdCode(); - void protocolType(byte value); + void cmdCode(short code); - int requestID(); + short cmdVersion(); - void requestID(int value); + void cmdVersion(short version); - byte serializerType(); + int requestID(); - void serializerType(byte value); + void requestID(int value); TrafficType trafficType(); void trafficType(TrafficType value); - String opCode(); + short opCode(); - void opCode(String value); + void opCode(short value); String remark(); @@ -55,36 +52,7 @@ public interface RemotingCommand { void property(String key, String value); - Object parameter(); - - void parameter(Object value); - - byte[] parameterBytes(); - - void parameterBytes(byte[] value); - - byte[] extraPayload(); - - void extraPayload(byte[] value); - - <T> T parameter(final SerializerFactory serializerFactory, Class<T> c); - - <T> T parameter(final SerializerFactory serializerFactory, final TypePresentation<T> typePresentation); - - <T> T parameter(final SerializerFactory serializerFactory, final Type type); - - enum CommandFlag { - SUCCESS("0"), - ERROR("-1"); - - private String flag; - - CommandFlag(final String flag) { - this.flag = flag; - } + byte[] payload(); - public String flag() { - return flag; - } - } + void payload(byte[] payload); } \ No newline at end of file diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compressable/Compressor.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compression/Compressor.java similarity index 94% rename from remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compressable/Compressor.java rename to remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compression/Compressor.java index 4688c45..d5c378e 100644 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compressable/Compressor.java +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compression/Compressor.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.remoting.api.compressable; +package org.apache.rocketmq.remoting.api.compression; public interface Compressor { String name(); diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compressable/CompressorFactory.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compression/CompressorFactory.java similarity index 94% rename from remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compressable/CompressorFactory.java rename to remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compression/CompressorFactory.java index 2494c78..4afd599 100644 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compressable/CompressorFactory.java +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/compression/CompressorFactory.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.remoting.api.compressable; +package org.apache.rocketmq.remoting.api.compression; public interface CompressorFactory { void register(Compressor compressor); diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/Serializer.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/Serializer.java deleted file mode 100644 index 8ef8dcd..0000000 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/Serializer.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.api.serializable; - -import java.lang.reflect.Type; -import java.nio.ByteBuffer; -import org.apache.rocketmq.remoting.common.TypePresentation; - -public interface Serializer { - String name(); - - byte type(); - - <T> T decode(final byte[] content, final Class<T> c); - - <T> T decode(final byte[] content, final TypePresentation<T> typePresentation); - - <T> T decode(final byte[] content, final Type type); - - ByteBuffer encode(final Object object); -} diff --git a/remoting-core/remoting-impl/pom.xml b/remoting-core/remoting-impl/pom.xml index c8be150..604416b 100644 --- a/remoting-core/remoting-impl/pom.xml +++ b/remoting-core/remoting-impl/pom.xml @@ -25,22 +25,25 @@ <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> - </dependency> - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-all</artifactId> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> + <scope>provided</scope> </dependency> <dependency> <groupId>com.esotericsoftware</groupId> <artifactId>kryo</artifactId> + <scope>provided</scope> </dependency> <dependency> <groupId>org.msgpack</groupId> <artifactId>msgpack</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> </dependency> <dependency> <groupId>io.netty</groupId> diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java index f7a4b67..3f5282c 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java @@ -19,8 +19,7 @@ package org.apache.rocketmq.remoting.config; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; -import org.apache.rocketmq.remoting.impl.protocol.compression.GZipCompressor; -import org.apache.rocketmq.remoting.impl.protocol.serializer.JsonSerializer; +import org.apache.rocketmq.remoting.impl.compression.GZipCompressor; public class RemotingConfig extends TcpSocketConfig { private int connectionMaxRetries = 3; @@ -37,7 +36,6 @@ public class RemotingConfig extends TcpSocketConfig { private int threadTaskLowWaterMark = 30000; private int threadTaskHighWaterMark = 50000; private int connectionRetryBackoffMillis = 3000; - private String serializerName = JsonSerializer.SERIALIZER_NAME; private String compressorName = GZipCompressor.COMPRESSOR_NAME; private int serviceThreadBlockQueueSize = 50000; private boolean clientNativeEpollEnable = false; @@ -147,14 +145,6 @@ public class RemotingConfig extends TcpSocketConfig { this.connectionRetryBackoffMillis = connectionRetryBackoffMillis; } - public String getSerializerName() { - return serializerName; - } - - public void setSerializerName(final String serializerName) { - this.serializerName = serializerName; - } - public String getCompressorName() { return compressorName; } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java index 44d4fd9..bfc536b 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java @@ -17,36 +17,43 @@ package org.apache.rocketmq.remoting.impl.command; -import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.Map.Entry; +import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper; import org.apache.rocketmq.remoting.api.command.RemotingCommand; import org.apache.rocketmq.remoting.api.command.TrafficType; import org.apache.rocketmq.remoting.api.exception.RemoteCodecException; public class CodecHelper { - //ProtocolType + TotalLength + RequestId + SerializeType + TrafficType + CodeLength + RemarkLength + PropertiesSize + ParameterLength - public final static int MIN_PROTOCOL_LEN = 1 + 4 + 4 + 1 + 1 + 2 + 2 + 2 + 4; - public final static char PROPERTY_SEPARATOR = '\n'; - public final static Charset REMOTING_CHARSET = Charset.forName("UTF-8"); - - public final static int CODE_MAX_LEN = 512; - public final static int PARAMETER_MAX_LEN = 33554432; - public final static int BODY_MAX_LEN = 33554432; - public final static int PACKET_MAX_LEN = 33554432; - - public static ByteBuffer encodeHeader(final RemotingCommand command, final int parameterLength, - final int extraPayload) { - byte[] code = command.opCode().getBytes(REMOTING_CHARSET); - int codeLength = code.length; - - byte[] remark = command.remark().getBytes(REMOTING_CHARSET); - int remarkLength = remark.length; + // ProtocolMagic(1) + TotalLength(4) + CmdCode(2) + CmdVersion(2) + RequestID(4) + TrafficType(1) + OpCode(2) + // + RemarkLen(2) + PropertiesSize(2) + PayloadLen(4); + public final static int MIN_PROTOCOL_LEN = 1 + 4 + 2 + 2 + 4 + 1 + 2 + 2 + 2 + 4; + private final static char PROPERTY_SEPARATOR = '\n'; + private final static Charset REMOTING_CHARSET = Charset.forName("UTF-8"); + + public final static byte PROTOCOL_MAGIC = 0x14; + private final static int REMARK_MAX_LEN = Short.MAX_VALUE; + private final static int PROPERTY_MAX_LEN = 524288; // 512KB + private final static int PAYLOAD_MAX_LEN = 16777216; // 16MB + public final static int PACKET_MAX_LEN = MIN_PROTOCOL_LEN + REMARK_MAX_LEN + PROPERTY_MAX_LEN + PAYLOAD_MAX_LEN; + + public static void encodeCommand(final RemotingCommand command, final ByteBufferWrapper out) { + out.writeByte(PROTOCOL_MAGIC); + + short remarkLen = 0; + byte [] remark = null; + if (command.remark() != null) { + remark = command.remark().getBytes(REMOTING_CHARSET); + if (remark.length > REMARK_MAX_LEN) { + throw new RemoteCodecException(String.format("Remark len: %d over max limit: %d", remark.length, REMARK_MAX_LEN)); + } + remarkLen = (short) remark.length; + } byte[][] props = null; - int propsLength = 0; + int propsLen = 0; StringBuilder sb = new StringBuilder(); - if (!command.properties().isEmpty()) { + if (command.properties() != null && !command.properties().isEmpty()) { props = new byte[command.properties().size()][]; int i = 0; for (Entry<String, String> next : command.properties().entrySet()) { @@ -57,122 +64,110 @@ public class CodecHelper { props[i] = sb.toString().getBytes(REMOTING_CHARSET); - propsLength += 2; - propsLength += props[i].length; + if (props[i].length > Short.MAX_VALUE) { + throw new RemoteCodecException(String.format("Property KV len: %d over max limit: %d", props[i].length, Short.MAX_VALUE)); + } + + propsLen += 2; + propsLen += props[i].length; i++; } } - int totalLength = MIN_PROTOCOL_LEN - 1 - 4 - + codeLength - + remarkLength - + propsLength - + parameterLength - + extraPayload; - - int headerLength = 1 + 4 + totalLength - parameterLength - extraPayload; - - ByteBuffer buf = ByteBuffer.allocate(headerLength); - buf.put(command.protocolType()); - buf.putInt(totalLength); - buf.putInt(command.requestID()); - buf.put(command.serializerType()); - buf.put((byte) command.trafficType().ordinal()); - - buf.putShort((short) codeLength); - if (codeLength > 0) { - buf.put(code); + if (propsLen > PROPERTY_MAX_LEN) { + throw new RemoteCodecException(String.format("Properties total len: %d over max limit: %d", propsLen, PROPERTY_MAX_LEN)); } - buf.putShort((short) remarkLength); - if (remarkLength > 0) { - buf.put(remark); + + int payloadLen = command.payload() == null ? 0 : command.payload().length; + + if (payloadLen > PAYLOAD_MAX_LEN) { + throw new RemoteCodecException(String.format("Payload len: %d over max limit: %d", payloadLen, PAYLOAD_MAX_LEN)); + } + + int totalLength = MIN_PROTOCOL_LEN + + remarkLen + + propsLen + + payloadLen; + + out.writeInt(totalLength); + out.writeShort(command.cmdCode()); + out.writeShort(command.cmdVersion()); + out.writeInt(command.requestID()); + out.writeByte((byte) command.trafficType().ordinal()); + out.writeShort(command.opCode()); + + out.writeShort(remarkLen); + if (remarkLen != 0) { + out.writeBytes(remark); } - if (props != null) { - buf.putShort((short) props.length); + + if (propsLen != 0) { + out.writeShort((short) props.length); for (byte[] prop : props) { - buf.putShort((short) prop.length); - buf.put(prop); + out.writeShort((short) prop.length); + out.writeBytes(prop); } - } else { - buf.putShort((short) 0); } - buf.putInt(parameterLength); - - buf.flip(); - - return buf; + out.writeInt(payloadLen); + if (payloadLen != 0) { + out.writeBytes(command.payload()); + } } - public static RemotingCommand decode(final ByteBuffer byteBuffer) { + public static RemotingCommand decode(final ByteBufferWrapper in) { RemotingCommandImpl cmd = new RemotingCommandImpl(); - int totalLength = byteBuffer.limit(); - cmd.requestID(byteBuffer.getInt()); - cmd.serializerType(byteBuffer.get()); - cmd.trafficType(TrafficType.parse(byteBuffer.get())); - - { - short size = byteBuffer.getShort(); - if (size > 0 && size <= CODE_MAX_LEN) { - byte[] bytes = new byte[size]; - byteBuffer.get(bytes); - String str = new String(bytes, REMOTING_CHARSET); - cmd.opCode(str); - } else { - throw new RemoteCodecException(String.format("Code length: %d over max limit: %d", size, CODE_MAX_LEN)); - } - } - { - short size = byteBuffer.getShort(); - if (size > 0) { - byte[] bytes = new byte[size]; - byteBuffer.get(bytes); - String str = new String(bytes, REMOTING_CHARSET); - cmd.remark(str); - } + cmd.cmdCode(in.readShort()); + cmd.cmdVersion(in.readShort()); + cmd.requestID(in.readInt()); + cmd.trafficType(TrafficType.parse(in.readByte())); + cmd.opCode(in.readShort()); + + + short remarkLen = in.readShort(); + if (remarkLen > 0) { + byte[] bytes = new byte[remarkLen]; + in.readBytes(bytes); + String str = new String(bytes, REMOTING_CHARSET); + cmd.remark(str); } - { - short size = byteBuffer.getShort(); - if (size > 0) { - for (int i = 0; i < size; i++) { - short length = byteBuffer.getShort(); - if (length > 0) { - byte[] bytes = new byte[length]; - byteBuffer.get(bytes); - String str = new String(bytes, REMOTING_CHARSET); - int index = str.indexOf(PROPERTY_SEPARATOR); - if (index > 0) { - String key = str.substring(0, index); - String value = str.substring(index + 1); - cmd.property(key, value); - } + short propsSize = in.readShort(); + int propsLen = 0; + if (propsSize > 0) { + for (int i = 0; i < propsSize; i++) { + short length = in.readShort(); + if (length > 0) { + byte[] bytes = new byte[length]; + in.readBytes(bytes); + String str = new String(bytes, REMOTING_CHARSET); + int index = str.indexOf(PROPERTY_SEPARATOR); + if (index > 0) { + String key = str.substring(0, index); + String value = str.substring(index + 1); + cmd.property(key, value); } } + + propsLen += 2; + propsLen += length; + if (propsLen > PROPERTY_MAX_LEN) { + throw new RemoteCodecException(String.format("Properties total len: %d over max limit: %d", propsLen, PROPERTY_MAX_LEN)); + } } } - { - int size = byteBuffer.getInt(); - if (size > 0 && size <= PARAMETER_MAX_LEN) { - byte[] bytes = new byte[size]; - byteBuffer.get(bytes); - cmd.parameterBytes(bytes); - } else if (size != 0) { - throw new RemoteCodecException(String.format("Parameter size: %d over max limit: %d", size, PARAMETER_MAX_LEN)); - } + int payloadLen = in.readInt(); + + if (payloadLen > PAYLOAD_MAX_LEN) { + throw new RemoteCodecException(String.format("Payload len: %d over max limit: %d", payloadLen, PAYLOAD_MAX_LEN)); } - { - int size = totalLength - byteBuffer.position(); - if (size > 0 && size <= BODY_MAX_LEN) { - byte[] bytes = new byte[size]; - byteBuffer.get(bytes); - cmd.extraPayload(bytes); - } else if (size != 0) { - throw new RemoteCodecException(String.format("Body size: %d over max limit: %d", size, BODY_MAX_LEN)); - } + if (payloadLen > 0) { + byte[] bytes = new byte[payloadLen]; + in.readBytes(bytes); + cmd.payload(bytes); } return cmd; diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java index 6e1efaa..adbb42c 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java @@ -20,37 +20,23 @@ package org.apache.rocketmq.remoting.impl.command; import org.apache.rocketmq.remoting.api.command.RemotingCommand; import org.apache.rocketmq.remoting.api.command.RemotingCommandFactory; import org.apache.rocketmq.remoting.api.command.TrafficType; -import org.apache.rocketmq.remoting.api.serializable.SerializerFactory; -import org.apache.rocketmq.remoting.impl.protocol.serializer.JsonSerializer; -import org.apache.rocketmq.remoting.impl.protocol.serializer.SerializerFactoryImpl; public class RemotingCommandFactoryImpl implements RemotingCommandFactory { - private final SerializerFactory serializerFactory = new SerializerFactoryImpl(); - private byte serializeType = JsonSerializer.SERIALIZER_TYPE; - - private byte PROTOCOL_MAGIC = 0x14; - public RemotingCommandFactoryImpl() { } - public RemotingCommandFactoryImpl(final String serializeName) { - this.serializeType = serializerFactory.type(serializeName); - } - @Override public RemotingCommand createRequest() { RemotingCommand request = new RemotingCommandImpl(); - request.protocolType(this.PROTOCOL_MAGIC); - request.serializerType(this.serializeType); return request; } @Override - public RemotingCommand createResponse(final RemotingCommand command) { + public RemotingCommand createResponse(final RemotingCommand request) { RemotingCommand response = new RemotingCommandImpl(); - response.requestID(command.requestID()); - response.protocolType(command.protocolType()); - response.serializerType(command.serializerType()); + response.cmdCode(request.cmdCode()); + response.cmdVersion(request.cmdVersion()); + response.requestID(request.requestID()); response.trafficType(TrafficType.RESPONSE); return response; } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java index bcf2338..b405eda 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.remoting.impl.command; -import java.lang.reflect.Type; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang3.builder.EqualsBuilder; @@ -26,39 +25,23 @@ import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.rocketmq.remoting.api.command.RemotingCommand; import org.apache.rocketmq.remoting.api.command.TrafficType; -import org.apache.rocketmq.remoting.api.serializable.SerializerFactory; -import org.apache.rocketmq.remoting.common.TypePresentation; public class RemotingCommandImpl implements RemotingCommand { public final static RequestIdGenerator REQUEST_ID_GENERATOR = RequestIdGenerator.inst; - private byte protocolType; - private byte serializeType; - + private short cmdCode; + private short cmdVersion; private volatile int requestId = REQUEST_ID_GENERATOR.incrementAndGet(); private TrafficType trafficType = TrafficType.REQUEST_SYNC; - private String code = CommandFlag.SUCCESS.flag(); + private short opCode = RemotingSysResponseCode.SUCCESS; private String remark = ""; - private Map<String, String> properties = new HashMap<String, String>(); - private Object parameter; - private byte[] extraPayload; - - private byte[] parameterByte; + private Map<String, String> properties = new HashMap<>(); + private byte[] payload; protected RemotingCommandImpl() { } @Override - public byte protocolType() { - return this.protocolType; - } - - @Override - public void protocolType(byte value) { - this.protocolType = value; - } - - @Override public int requestID() { return requestId; } @@ -69,16 +52,6 @@ public class RemotingCommandImpl implements RemotingCommand { } @Override - public byte serializerType() { - return this.serializeType; - } - - @Override - public void serializerType(byte value) { - this.serializeType = value; - } - - @Override public TrafficType trafficType() { return this.trafficType; } @@ -89,13 +62,13 @@ public class RemotingCommandImpl implements RemotingCommand { } @Override - public String opCode() { - return this.code; + public short opCode() { + return this.opCode; } @Override - public void opCode(String value) { - this.code = value; + public void opCode(short value) { + this.opCode = value; } @Override @@ -129,68 +102,33 @@ public class RemotingCommandImpl implements RemotingCommand { } @Override - public Object parameter() { - return this.parameter; - } - - @Override - public void parameter(Object value) { - this.parameter = value; - } - - @Override - public byte[] parameterBytes() { - return this.getParameterByte(); - } - - public byte[] getParameterByte() { - return parameterByte; - } - - public void setParameterByte(byte[] parameterByte) { - this.parameterByte = parameterByte; - } - - @Override - public void parameterBytes(byte[] value) { - this.setParameterByte(value); + public short cmdCode() { + return this.cmdCode; } @Override - public byte[] extraPayload() { - return this.extraPayload; + public void cmdCode(short code) { + this.cmdCode = code; } @Override - public void extraPayload(byte[] value) { - this.extraPayload = value; + public short cmdVersion() { + return this.cmdVersion; } @Override - public <T> T parameter(SerializerFactory serializerFactory, Class<T> c) { - if (this.parameter() != null) - return (T) this.parameter(); - final T decode = serializerFactory.get(this.serializerType()).decode(this.parameterBytes(), c); - this.parameter(decode); - return decode; + public void cmdVersion(short version) { + this.cmdVersion = version; } @Override - public <T> T parameter(SerializerFactory serializerFactory, TypePresentation<T> typePresentation) { - if (this.parameter() != null) - return (T) this.parameter(); - final T decode = serializerFactory.get(this.serializerType()).decode(this.parameterBytes(), typePresentation); - this.parameter(decode); - return decode; + public byte[] payload() { + return this.payload; } @Override - public <T> T parameter(SerializerFactory serializerFactory, Type type) { - if (this.parameter() != null) - return (T) this.parameter(); - final T decode = serializerFactory.get(this.serializerType()).decode(this.parameterBytes(), type); - this.parameter(decode); - return decode; + public void payload(byte[] payload) { + this.payload = payload; } @Override diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/SerializerFactory.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingSysResponseCode.java similarity index 70% rename from remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/SerializerFactory.java rename to remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingSysResponseCode.java index b47bf99..ae76c6f 100644 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/SerializerFactory.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingSysResponseCode.java @@ -15,14 +15,17 @@ * limitations under the License. */ -package org.apache.rocketmq.remoting.api.serializable; +package org.apache.rocketmq.remoting.impl.command; -public interface SerializerFactory { - void register(Serializer serialization); +public class RemotingSysResponseCode { - byte type(String serializationName); + public static final short SUCCESS = 0; - Serializer get(byte type); + public static final short SYSTEM_ERROR = 1; - void clearAll(); + public static final short SYSTEM_BUSY = 2; + + public static final short REQUEST_CODE_NOT_SUPPORTED = 3; + + public static final short TRANSACTION_FAILED = 4; } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/CompressorFactoryImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/compression/CompressorFactoryImpl.java similarity index 91% rename from remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/CompressorFactoryImpl.java rename to remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/compression/CompressorFactoryImpl.java index 10e97ba..40576ad 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/CompressorFactoryImpl.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/compression/CompressorFactoryImpl.java @@ -15,10 +15,10 @@ * limitations under the License. */ -package org.apache.rocketmq.remoting.impl.protocol.compression; +package org.apache.rocketmq.remoting.impl.compression; -import org.apache.rocketmq.remoting.api.compressable.Compressor; -import org.apache.rocketmq.remoting.api.compressable.CompressorFactory; +import org.apache.rocketmq.remoting.api.compression.Compressor; +import org.apache.rocketmq.remoting.api.compression.CompressorFactory; public class CompressorFactoryImpl implements CompressorFactory { private static final int MAX_COUNT = 0x0FF; diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/GZipCompressor.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/compression/GZipCompressor.java similarity index 95% rename from remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/GZipCompressor.java rename to remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/compression/GZipCompressor.java index fc33f4c..53dd4bf 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/GZipCompressor.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/compression/GZipCompressor.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.remoting.impl.protocol.compression; +package org.apache.rocketmq.remoting.impl.compression; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -23,7 +23,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; -import org.apache.rocketmq.remoting.api.compressable.Compressor; +import org.apache.rocketmq.remoting.api.compression.Compressor; public class GZipCompressor implements Compressor { public static final int BUFFER = 1024; diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java index a4c33e1..5026224 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java @@ -48,8 +48,6 @@ import org.apache.rocketmq.remoting.api.interceptor.Interceptor; import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup; import org.apache.rocketmq.remoting.api.interceptor.RequestContext; import org.apache.rocketmq.remoting.api.interceptor.ResponseContext; -import org.apache.rocketmq.remoting.api.serializable.Serializer; -import org.apache.rocketmq.remoting.api.serializable.SerializerFactory; import org.apache.rocketmq.remoting.common.ChannelEventListenerGroup; import org.apache.rocketmq.remoting.common.Pair; import org.apache.rocketmq.remoting.common.ResponseResult; @@ -58,7 +56,7 @@ import org.apache.rocketmq.remoting.config.RemotingConfig; import org.apache.rocketmq.remoting.external.ThreadUtils; import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl; import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl; -import org.apache.rocketmq.remoting.impl.protocol.serializer.SerializerFactoryImpl; +import org.apache.rocketmq.remoting.impl.command.RemotingSysResponseCode; import org.apache.rocketmq.remoting.internal.UIDGenerator; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; @@ -66,7 +64,6 @@ import org.slf4j.LoggerFactory; public abstract class NettyRemotingAbstract implements RemotingService { protected static final Logger LOG = LoggerFactory.getLogger(NettyRemotingAbstract.class); - protected final SerializerFactory serializerFactory = new SerializerFactoryImpl(); protected final ChannelEventExecutor channelEventExecutor = new ChannelEventExecutor("ChannelEventExecutor"); private final Semaphore semaphoreOneway; private final Semaphore semaphoreAsync; @@ -87,11 +84,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { this.publicExecutor = ThreadUtils.newFixedThreadPool( clientConfig.getClientAsyncCallbackExecutorThreads(), 10000, "Remoting-PublicExecutor", true); - this.remotingCommandFactory = new RemotingCommandFactoryImpl(clientConfig.getSerializerName()); - } - - public SerializerFactory getSerializerFactory() { - return serializerFactory; + this.remotingCommandFactory = new RemotingCommandFactoryImpl(); } protected void putNettyEvent(final NettyChannelEvent event) { @@ -158,7 +151,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { extractRemoteAddress(ctx.channel()), cmd, e, "FLOW_CONTROL")); RemotingCommand response = remotingCommandFactory.createResponse(cmd); - response.opCode(RemotingCommand.CommandFlag.ERROR.flag()); + response.opCode(RemotingSysResponseCode.SYSTEM_BUSY); response.remark("SYSTEM_BUSY"); writeAndFlush(ctx.channel(), response); } @@ -194,19 +187,9 @@ public abstract class NettyRemotingAbstract implements RemotingService { if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) { //FiXME Exception interceptor can not throw exception interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.RESPONSE, extractRemoteAddress(ctx.channel()), cmd, e, "")); - RemotingCommand response = remotingCommandFactory.createResponse(cmd); - response.opCode(RemotingCommand.CommandFlag.ERROR.flag()); - response.remark(serializeException(cmd.serializerType(), e)); - response.property("Exception", e.getClass().getName()); - ctx.writeAndFlush(response); } } - private String serializeException(byte serializeType, Throwable exception) { - final Serializer serialization = getSerializerFactory().get(serializeType); - return serialization.encode(exception).toString(); - } - private void handleResponse(RemotingCommand response, RemotingCommand cmd, ChannelHandlerContext ctx) { if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) { if (response != null) { @@ -508,11 +491,6 @@ public abstract class NettyRemotingAbstract implements RemotingService { } @Override - public SerializerFactory serializerFactory() { - return this.serializerFactory; - } - - @Override public RemotingCommandFactory commandFactory() { return this.remotingCommandFactory; } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java index ec1d69d..f239ee9 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java @@ -22,7 +22,6 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; -import java.nio.ByteBuffer; import java.util.List; import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper; import org.apache.rocketmq.remoting.api.command.RemotingCommand; @@ -55,13 +54,12 @@ public class Decoder extends ByteToMessageDecoder { private Object decode(final ChannelHandlerContext ctx, ByteBufferWrapper wrapper) throws Exception { int originReaderIndex = wrapper.readerIndex(); - byte type = wrapper.readByte(); + byte magic = wrapper.readByte(); try { - RemotingCommand cmd = decode(wrapper, originReaderIndex); - if (cmd != null) { - cmd.protocolType(type); + if (magic != CodecHelper.PROTOCOL_MAGIC) { + throw new RemoteCodecException(String.format("MagicCode %d is wrong, expect %d", magic, CodecHelper.PROTOCOL_MAGIC)); } - return cmd; + return decode(wrapper, originReaderIndex); } catch (final RemoteCodecException e) { LOG.warn("Decode error {}, close the channel {}", e.getMessage(), ctx.channel()); ctx.channel().close().addListener(new ChannelFutureListener() { @@ -76,7 +74,7 @@ public class Decoder extends ByteToMessageDecoder { public RemotingCommand decode(final ByteBufferWrapper wrapper, final int originReaderIndex) { // Full message isn't available yet, return nothing for now - if (wrapper.readableBytes() < CodecHelper.MIN_PROTOCOL_LEN - 1) { + if (wrapper.readableBytes() < CodecHelper.MIN_PROTOCOL_LEN - 1 /*MagicCode*/) { wrapper.setReaderIndex(originReaderIndex); return null; } @@ -91,17 +89,10 @@ public class Decoder extends ByteToMessageDecoder { throw new IllegalArgumentException(String.format("Total length %d is more than limit %d", totalLength, CodecHelper.PACKET_MAX_LEN)); } - if (wrapper.readableBytes() < totalLength) { + if (wrapper.readableBytes() < totalLength - 1 /*MagicCode*/ - 4 /*TotalLen*/) { wrapper.setReaderIndex(originReaderIndex); return null; } - - ByteBuffer totalBuffer = ByteBuffer.allocate(totalLength); - - wrapper.readBytes(totalBuffer); - - totalBuffer.flip(); - - return CodecHelper.decode(totalBuffer); + return CodecHelper.decode(wrapper); } } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java index 10aa504..78e5e5c 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java @@ -23,22 +23,16 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper; import org.apache.rocketmq.remoting.api.command.RemotingCommand; -import org.apache.rocketmq.remoting.api.serializable.Serializer; -import org.apache.rocketmq.remoting.api.serializable.SerializerFactory; import org.apache.rocketmq.remoting.impl.buffer.NettyByteBufferWrapper; import org.apache.rocketmq.remoting.impl.command.CodecHelper; -import org.apache.rocketmq.remoting.impl.protocol.serializer.SerializerFactoryImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Encoder extends MessageToByteEncoder<RemotingCommand> { private static final Logger LOG = LoggerFactory.getLogger(Encoder.class); - private final SerializerFactory serializerFactory = new SerializerFactoryImpl(); - public Encoder() { } @@ -47,7 +41,7 @@ public class Encoder extends MessageToByteEncoder<RemotingCommand> { try { ByteBufferWrapper wrapper = new NettyByteBufferWrapper(out); - encode(serializerFactory, remotingCommand, wrapper); + encode(remotingCommand, wrapper); } catch (final Exception e) { LOG.error("Error occurred when encoding response for channel " + ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress(), e); if (remotingCommand != null) { @@ -62,28 +56,7 @@ public class Encoder extends MessageToByteEncoder<RemotingCommand> { } } - public void encode(final SerializerFactory serializerFactory, final RemotingCommand remotingCommand, - final ByteBufferWrapper out) { - ByteBuffer encodeParameter = null; - if (remotingCommand.parameterBytes() != null) { - encodeParameter = ByteBuffer.wrap(remotingCommand.parameterBytes()); - } else if (remotingCommand.parameter() != null) { - final Serializer serialization = serializerFactory.get(remotingCommand.serializerType()); - encodeParameter = serialization.encode(remotingCommand.parameter()); - } - - int parameterLength = encodeParameter != null ? encodeParameter.limit() : 0; - int extBodyLength = remotingCommand.extraPayload() != null ? remotingCommand.extraPayload().length : 0; - - ByteBuffer header = CodecHelper.encodeHeader(remotingCommand, parameterLength, extBodyLength); - out.writeBytes(header); - - if (encodeParameter != null) { - out.writeBytes(encodeParameter); - } - - if (remotingCommand.extraPayload() != null) { - out.writeBytes(remotingCommand.extraPayload()); - } + public void encode(final RemotingCommand remotingCommand, final ByteBufferWrapper out) { + CodecHelper.encodeCommand(remotingCommand, out); } } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/JsonSerializer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/JsonSerializer.java deleted file mode 100644 index c85d44b..0000000 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/JsonSerializer.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.impl.protocol.serializer; - -import com.alibaba.fastjson.JSON; -import java.lang.reflect.Type; -import java.nio.ByteBuffer; -import org.apache.rocketmq.remoting.api.serializable.Serializer; -import org.apache.rocketmq.remoting.common.TypePresentation; -import org.apache.rocketmq.remoting.impl.command.CodecHelper; - -public class JsonSerializer implements Serializer { - public static final String SERIALIZER_NAME = JsonSerializer.class.getSimpleName(); - public static final byte SERIALIZER_TYPE = 'J'; - - public JsonSerializer() { - } - - @Override - public String name() { - return SERIALIZER_NAME; - } - - @Override - public byte type() { - return SERIALIZER_TYPE; - } - - @Override - public <T> T decode(final byte[] content, final Class<T> c) { - if (content != null) { - try { - final String jsonString = new String(content, CodecHelper.REMOTING_CHARSET); - return JSON.parseObject(jsonString, c); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - return null; - } - - @Override - public <T> T decode(final byte[] content, final TypePresentation<T> typePresentation) { - return decode(content, typePresentation.getType()); - } - - @Override - public <T> T decode(byte[] content, Type type) { - if (content != null) { - try { - final String jsonString = new String(content, CodecHelper.REMOTING_CHARSET); - return JSON.parseObject(jsonString, type); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - return null; - } - - @Override - public ByteBuffer encode(final Object object) { - if (object != null) { - String jsonString = JSON.toJSONString(object); - byte[] bytes = jsonString.getBytes(CodecHelper.REMOTING_CHARSET); - try { - return ByteBuffer.wrap(bytes, 0, bytes.length); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - return null; - } -} diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/Kryo3Serializer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/Kryo3Serializer.java deleted file mode 100644 index 06ea217..0000000 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/Kryo3Serializer.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.impl.protocol.serializer; - -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; -import java.nio.ByteBuffer; -import org.apache.rocketmq.remoting.api.serializable.Serializer; -import org.apache.rocketmq.remoting.common.TypePresentation; - -public class Kryo3Serializer implements Serializer { - public static final String SERIALIZER_NAME = Kryo3Serializer.class.getSimpleName(); - public static final byte SERIALIZER_TYPE = 'K'; - - public Kryo3Serializer() { - } - - @Override - public String name() { - return SERIALIZER_NAME; - } - - @Override - public byte type() { - return SERIALIZER_TYPE; - } - - @Override - public <T> T decode(final byte[] content, final Class<T> c) { - if (content != null) { - Input input = null; - try { - input = new Input(content); - return (T) ThreadSafeKryo.getKryoInstance().readClassAndObject(input); - } catch (Exception e) { - throw new RuntimeException(e); - } finally { - input.close(); - } - } - - return null; - } - - @Override - public <T> T decode(final byte[] content, final TypePresentation<T> typePresentation) { - return decode(content, typePresentation.getType()); - } - - @Override - public <T> T decode(byte[] content, Type type) { - if (type instanceof ParameterizedType) { - return decode(content, (Class<? extends T>) ((ParameterizedType) type).getRawType()); - } else if (type instanceof Class) { - return decode(content, (Class<? extends T>) type); - } - return null; - } - - @Override - public ByteBuffer encode(final Object object) { - if (object != null) { - try (Output output = new Output(1024, 1024 * 1024 * 6)) { - ThreadSafeKryo.getKryoInstance().writeClassAndObject(output, object); - return ByteBuffer.wrap(output.getBuffer(), 0, output.position()); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - return null; - } -} diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/MsgPackSerializer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/MsgPackSerializer.java deleted file mode 100644 index 1097f8f..0000000 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/MsgPackSerializer.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.impl.protocol.serializer; - -import java.lang.reflect.Type; -import java.nio.ByteBuffer; -import org.apache.rocketmq.remoting.api.serializable.Serializer; -import org.apache.rocketmq.remoting.common.TypePresentation; -import org.msgpack.MessagePack; -import org.msgpack.template.Template; - -public class MsgPackSerializer implements Serializer { - public static final String SERIALIZER_NAME = MsgPackSerializer.class.getSimpleName(); - public static final byte SERIALIZER_TYPE = 'M'; - private final MessagePack messagePack = new MessagePack(); - - public MsgPackSerializer() { - } - - @Override - public String name() { - return SERIALIZER_NAME; - } - - @Override - public byte type() { - return SERIALIZER_TYPE; - } - - @Override - public <T> T decode(final byte[] content, final Class<T> c) { - try { - return messagePack.read(content, c); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public <T> T decode(final byte[] content, final TypePresentation<T> typePresentation) { - return decode(content, typePresentation.getType()); - } - - @Override - public <T> T decode(byte[] content, Type type) { - Template<T> template = (Template<T>) messagePack.lookup(type); - try { - return messagePack.read(content, template); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public ByteBuffer encode(final Object object) { - try { - byte[] data = messagePack.write(object); - return ByteBuffer.wrap(data); - } catch (Exception e) { - throw new RuntimeException(e); - } - } -} diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/SerializerFactoryImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/SerializerFactoryImpl.java deleted file mode 100644 index 632b61f..0000000 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/SerializerFactoryImpl.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.impl.protocol.serializer; - -import org.apache.rocketmq.remoting.api.serializable.Serializer; -import org.apache.rocketmq.remoting.api.serializable.SerializerFactory; - -public class SerializerFactoryImpl implements SerializerFactory { - private static final int MAX_COUNT = 0x0FF; - private final Serializer[] tables = new Serializer[MAX_COUNT]; - - public SerializerFactoryImpl() { - this.register(new JsonSerializer()); - this.register(new Kryo3Serializer()); - this.register(new MsgPackSerializer()); - } - - @Override - public void register(Serializer serialization) { - if (tables[serialization.type() & MAX_COUNT] != null) { - throw new RuntimeException("serialization header's sign is overlapped"); - } - tables[serialization.type() & MAX_COUNT] = serialization; - } - - @Override - public byte type(final String serializationName) { - for (Serializer table : this.tables) { - if (table != null) { - if (table.name().equalsIgnoreCase(serializationName)) { - return table.type(); - } - } - } - - throw new IllegalArgumentException(String.format("the serialization: %s not exist", serializationName)); - } - - @Override - public Serializer get(byte type) { - return tables[type & MAX_COUNT]; - } - - @Override - public void clearAll() { - for (int i = 0; i < this.tables.length; i++) { - this.tables[i] = null; - } - } - - public Serializer[] getTables() { - return tables; - } -} diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/ThreadSafeKryo.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/ThreadSafeKryo.java deleted file mode 100644 index cadfc27..0000000 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/ThreadSafeKryo.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.impl.protocol.serializer; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.KryoSerializable; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.BitSet; -import java.util.Calendar; -import java.util.Collection; -import java.util.Collections; -import java.util.Currency; -import java.util.Date; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.Locale; -import java.util.Map; -import java.util.TreeMap; -import java.util.TreeSet; -import org.objenesis.strategy.StdInstantiatorStrategy; - -public class ThreadSafeKryo { - private static final ThreadLocal<Kryo> KRYOS = new ThreadLocal<Kryo>() { - protected Kryo initialValue() { - Kryo kryo = new Kryo(); - - kryo.register(byte[].class); - kryo.register(char[].class); - kryo.register(short[].class); - kryo.register(int[].class); - kryo.register(long[].class); - kryo.register(float[].class); - kryo.register(double[].class); - kryo.register(boolean[].class); - kryo.register(String[].class); - kryo.register(Object[].class); - kryo.register(KryoSerializable.class); - kryo.register(BigInteger.class); - kryo.register(BigDecimal.class); - kryo.register(Class.class); - kryo.register(Date.class); - // kryo.register(Enum.class); - kryo.register(EnumSet.class); - kryo.register(Currency.class); - kryo.register(StringBuffer.class); - kryo.register(StringBuilder.class); - kryo.register(Collections.EMPTY_LIST.getClass()); - kryo.register(Collections.EMPTY_MAP.getClass()); - kryo.register(Collections.EMPTY_SET.getClass()); - kryo.register(Collections.singletonList(null).getClass()); - kryo.register(Collections.singletonMap(null, null).getClass()); - kryo.register(Collections.singleton(null).getClass()); - kryo.register(TreeSet.class); - kryo.register(Collection.class); - kryo.register(TreeMap.class); - kryo.register(Map.class); - try { - kryo.register(Class.forName("sun.util.calendar.ZoneInfo")); - } catch (ClassNotFoundException e) { - // Noop - } - kryo.register(Calendar.class); - kryo.register(Locale.class); - - kryo.register(BitSet.class); - kryo.register(HashMap.class); - kryo.register(Timestamp.class); - kryo.register(ArrayList.class); - - // kryo.setRegistrationRequired(true); - kryo.setReferences(false); - kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy())); - - return kryo; - } - }; - - public static Kryo getKryoInstance() { - return KRYOS.get(); - } -}
