This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch INLONG-25 in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit e00a5ba1bfaf95360ae0648e140732b3a4229003 Author: Zijie Lu <[email protected]> AuthorDate: Thu May 6 10:45:16 2021 +0800 Add missing file Signed-off-by: Zijie Lu <[email protected]> --- .../tubemq-client-go/codec/tubemq_codec.go | 138 +++++++++++++++++++++ 1 file changed, 138 insertions(+) diff --git a/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go b/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go new file mode 100644 index 0000000..96ebfa9 --- /dev/null +++ b/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package codec defines the encoding and decoding logic between TubeMQ. +// If the protocol of encoding and decoding is changed, only this package +// will need to be changed. +package codec + +import ( + "bufio" + "encoding/binary" + "errors" + "io" +) + +const ( + // The default begin token of TubeMQ RPC protocol. + RPCProtocolBeginToken uint32 = 0xFF7FF4FE + // The default max buffer size the RPC response. + RPCMaxBufferSize uint32 = 8192 + frameHeadLen uint32 = 12 + maxBufferSize int = 128 * 1024 + defaultMsgSize int = 4096 + dataLen uint32 = 4 + listSizeLen uint32 = 4 + serialNoLen uint32 = 4 + beginTokenLen uint32 = 4 +) + +// TubeMQDecoder is the implementation of the decoder of response from TubeMQ. +type TubeMQDecoder struct { + reader io.Reader + msg []byte +} + +// New will return a default TubeMQDecoder. +func New(reader io.Reader) *TubeMQDecoder { + bufferReader := bufio.NewReaderSize(reader, maxBufferSize) + return &TubeMQDecoder{ + msg: make([]byte, defaultMsgSize), + reader: bufferReader, + } +} + +// Decode will decode the response from TubeMQ to Response according to +// the RPC protocol of TubeMQ. +func (t *TubeMQDecoder) Decode() (Response, error) { + var num int + var err error + if num, err = io.ReadFull(t.reader, t.msg[:frameHeadLen]); err != nil { + return nil, err + } + if num != int(frameHeadLen) { + return nil, errors.New("framer: read frame header num invalid") + } + if binary.BigEndian.Uint32(t.msg[:beginTokenLen]) != RPCProtocolBeginToken { + return nil, errors.New("framer: read framer rpc protocol begin token not match") + } + serialNo := binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen]) + listSize := binary.BigEndian.Uint32(t.msg[beginTokenLen+serialNoLen : beginTokenLen+serialNoLen+listSizeLen]) + totalLen := int(frameHeadLen) + for i := 0; i < int(listSize); i++ { + size := make([]byte, 4) + n, err := io.ReadFull(t.reader, size) + if err != nil { + return nil, err + } + if n != int(dataLen) { + return nil, errors.New("framer: read invalid size") + } + + s := int(binary.BigEndian.Uint32(size)) + if totalLen+s > len(t.msg) { + data := t.msg[:totalLen] + t.msg = make([]byte, 0, max(2*len(t.msg), totalLen+s)) + copy(t.msg, data[:]) + } + + if num, err = io.ReadFull(t.reader, t.msg[totalLen:totalLen+s]); err != nil { + return nil, err + } + if num != s { + return nil, errors.New("framer: read invalid data") + } + totalLen += s + } + + data := make([]byte, totalLen-int(frameHeadLen)) + copy(data, t.msg[frameHeadLen:totalLen]) + + return &TubeMQResponse{ + serialNo: serialNo, + Buffer: data, + }, nil +} + +// TubeMQRequest is the implementation of TubeMQ request. +type TubeMQRequest struct { + serialNo uint32 + req []byte +} + +// TubeMQResponse is the TubeMQ implementation of Response. +type TubeMQResponse struct { + serialNo uint32 + Buffer []byte +} + +// GetSerialNo will return the SerialNo of Response. +func (t TubeMQResponse) GetSerialNo() uint32 { + return t.serialNo +} + +// GetResponseBuf will return the body of Response. +func (t TubeMQResponse) GetBuffer() []byte { + return t.Buffer +} + +func max(x, y int) int { + if x < y { + return y + } + return x +}
