pandaapo commented on code in PR #4702:
URL: https://github.com/apache/eventmesh/pull/4702#discussion_r1439021521
##########
eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java:
##########
@@ -48,139 +46,122 @@
@Slf4j
public class Codec extends ByteToMessageCodec<Package> {
- private static final int FRAME_MAX_LENGTH = 1024 * 1024 * 4;
+ private static final int FRAME_MAX_LENGTH = 1024 * 1024 * 4; // 4M
private static final byte[] CONSTANT_MAGIC_FLAG =
serializeBytes("EventMesh");
private static final byte[] VERSION = serializeBytes("0000");
- private Encoder encoder = new Encoder();
- private Decoder decoder = new Decoder();
-
@Override
protected void encode(ChannelHandlerContext ctx, Package pkg, ByteBuf out)
throws Exception {
- encoder.encode(ctx, pkg, out);
- }
+ Preconditions.checkNotNull(pkg, "TcpPackage cannot be null");
+ final Header header = pkg.getHeader();
+ Preconditions.checkNotNull(header, "TcpPackage header cannot be null",
header);
+ if (log.isDebugEnabled()) {
+ log.debug("Encoder pkg={}", JsonUtils.toJSONString(pkg));
+ }
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object>
out) throws Exception {
- decoder.decode(ctx, in, out);
- }
+ final byte[] headerData = JsonUtils.toJSONBytes(header);
+ final byte[] bodyData;
- public static class Encoder extends MessageToByteEncoder<Package> {
-
- @Override
- public void encode(ChannelHandlerContext ctx, Package pkg, ByteBuf
out) throws Exception {
- Preconditions.checkNotNull(pkg, "TcpPackage cannot be null");
- final Header header = pkg.getHeader();
- Preconditions.checkNotNull(header, "TcpPackage header cannot be
null", header);
- if (log.isDebugEnabled()) {
- log.debug("Encoder pkg={}", JsonUtils.toJSONString(pkg));
- }
-
- final byte[] headerData = JsonUtils.toJSONBytes(header);
- final byte[] bodyData;
-
- if (StringUtils.equals(Constants.CLOUD_EVENTS_PROTOCOL_NAME,
header.getStringProperty(Constants.PROTOCOL_TYPE))) {
- bodyData = (byte[]) pkg.getBody();
- } else {
- bodyData = JsonUtils.toJSONBytes(pkg.getBody());
- }
-
- int headerLength = ArrayUtils.getLength(headerData);
- int bodyLength = ArrayUtils.getLength(bodyData);
-
- final int length = CONSTANT_MAGIC_FLAG.length + VERSION.length +
headerLength + bodyLength;
-
- if (length > FRAME_MAX_LENGTH) {
- throw new IllegalArgumentException("message size is exceed
limit!");
- }
- /**
- * Header + Body, Format:
- * <pre>
- *
┌───────────────┬─────────────┬──────────────────┬──────────────────┬──────────────────┬─────────────────┐
- * │ MAGIC_FLAG │ VERSION │ package length │ Header
length │ Header │ body │
- * │ (9bytes) │ (4bytes) │ (4bytes) │
(4bytes) │ (header bytes) │ (body bytes) │
- *
└───────────────┴─────────────┴──────────────────┴──────────────────┴──────────────────┴─────────────────┘
- * </pre>
- */
- out.writeBytes(CONSTANT_MAGIC_FLAG);
- out.writeBytes(VERSION);
- out.writeInt(length);
- out.writeInt(headerLength);
- if (headerData != null) {
- out.writeBytes(headerData);
- }
- if (bodyData != null) {
- out.writeBytes(bodyData);
- }
+ if (StringUtils.equals(Constants.CLOUD_EVENTS_PROTOCOL_NAME,
header.getStringProperty(Constants.PROTOCOL_TYPE))) {
+ bodyData = (byte[]) pkg.getBody();
+ } else {
+ bodyData = JsonUtils.toJSONBytes(pkg.getBody());
}
- }
- public static class Decoder extends ReplayingDecoder<Package> {
-
- @Override
- public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object>
out) throws Exception {
- try {
- if (null == in) {
- return;
- }
-
- byte[] flagBytes = parseFlag(in);
- byte[] versionBytes = parseVersion(in);
- validateFlag(flagBytes, versionBytes, ctx);
-
- final int length = in.readInt();
- final int headerLength = in.readInt();
- final int bodyLength = length - CONSTANT_MAGIC_FLAG.length -
VERSION.length - headerLength;
- Header header = parseHeader(in, headerLength);
- Object body = parseBody(in, header, bodyLength);
-
- Package pkg = new Package(header, body);
- out.add(pkg);
- } catch (Exception e) {
- log.error("decode error| received data: {}.",
deserializeBytes(in.array()), e);
- throw e;
- }
- }
+ int headerLength = ArrayUtils.getLength(headerData);
+ int bodyLength = ArrayUtils.getLength(bodyData);
+
+ final int length = CONSTANT_MAGIC_FLAG.length + VERSION.length +
headerLength + bodyLength;
- private byte[] parseFlag(ByteBuf in) {
- final byte[] flagBytes = new byte[CONSTANT_MAGIC_FLAG.length];
- in.readBytes(flagBytes);
- return flagBytes;
+ if (length > FRAME_MAX_LENGTH) {
+ throw new IllegalArgumentException("message size is exceed
limit!");
+ }
+ /**
+ * Header + Body, Format:
+ * <pre>
+ *
┌───────────────┬─────────────┬──────────────────┬──────────────────┬──────────────────┬─────────────────┐
+ * │ MAGIC_FLAG │ VERSION │ package length │ Header length
│ Header │ body │
+ * │ (9bytes) │ (4bytes) │ (4bytes) │ (4bytes)
│ (header bytes) │ (body bytes) │
+ *
└───────────────┴─────────────┴──────────────────┴──────────────────┴──────────────────┴─────────────────┘
+ * </pre>
+ */
+ out.writeBytes(CONSTANT_MAGIC_FLAG);
+ out.writeBytes(VERSION);
+ out.writeInt(length);
+ out.writeInt(headerLength);
+ if (headerData != null) {
+ out.writeBytes(headerData);
}
+ if (bodyData != null) {
+ out.writeBytes(bodyData);
+ }
+ }
- private byte[] parseVersion(ByteBuf in) {
- final byte[] versionBytes = new byte[VERSION.length];
- in.readBytes(versionBytes);
- return versionBytes;
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object>
out) throws Exception {
+ if (in == null) {
+ return;
+ }
+ if (in.readableBytes() < CONSTANT_MAGIC_FLAG.length + VERSION.length +
4 + 4) {
Review Comment:
Could these 2 magic numbers be changed to constants to indicate that they
are the length of the package length value and the length of the header length
value?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]