This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch INLONG-25 in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 877505e72540d9b3a98de2333a3afd3e933eb1d0 Author: Zijie Lu <[email protected]> AuthorDate: Mon May 24 11:06:45 2021 +0800 [INLONG-604]rpc request for Go SDK Signed-off-by: Zijie Lu <[email protected]> --- .../tubemq-client-go/client/info.go | 129 ++++++++++ .../tubemq-client-go/client/remote.go | 79 ++++++ .../tubemq-client-go/codec/tubemq_codec.go | 8 +- .../tubemq-client-go/config/config.go | 12 + tubemq-client-twins/tubemq-client-go/errs/errs.go | 50 ++++ .../tubemq-client-go/metadata/consumer_event.go | 56 +++++ .../tubemq-client-go/metadata/metadata.go | 47 ++++ .../tubemq-client-go/metadata/node.go | 55 +++++ .../tubemq-client-go/metadata/partition.go | 57 +++++ .../tubemq-client-go/metadata/subcribe_info.go | 55 +++++ tubemq-client-twins/tubemq-client-go/rpc/broker.go | 273 +++++++++++++++++++++ tubemq-client-twins/tubemq-client-go/rpc/client.go | 72 ++++++ tubemq-client-twins/tubemq-client-go/rpc/master.go | 212 ++++++++++++++++ 13 files changed, 1104 insertions(+), 1 deletion(-) diff --git a/tubemq-client-twins/tubemq-client-go/client/info.go b/tubemq-client-twins/tubemq-client-go/client/info.go new file mode 100644 index 0000000..25a2686 --- /dev/null +++ b/tubemq-client-twins/tubemq-client-go/client/info.go @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package client defines the api and information +// which can be exposed to user. +package client + +import ( + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol" +) + +// InvalidOffset represents the offset which is invalid. +const InValidOffset = -1 + +// SubInfo represents the sub information of the client. +type SubInfo struct { + clientID string + boundConsume bool + selectBig bool + sourceCount int32 + sessionKey string + notAllocated bool + firstRegistered bool + subscribedTime int64 + boundPartitions string + topics []string + topicConds []string + topicFilter map[string]bool + assignedPartitions map[string]uint64 + topicFilters map[string][]string + authInfo *protocol.AuthorizedInfo + masterCertificateInfo *protocol.MasterCertificateInfo +} + +// GetClientID returns the client ID. +func (s *SubInfo) GetClientID() string { + return s.clientID +} + +// IsFiltered returns whether a topic if filtered. +func (s *SubInfo) IsFiltered(topic string) bool { + if filtered, ok := s.topicFilter[topic]; ok { + return filtered + } + return false +} + +// GetTopicFilters returns the topic filters. +func (s *SubInfo) GetTopicFilters() map[string][]string { + return s.topicFilters +} + +// GetAssignedPartOffset returns the assignedPartOffset of the given partitionKey. +func (s *SubInfo) GetAssignedPartOffset(partitionKey string) int64 { + if !s.firstRegistered && s.boundConsume && s.notAllocated { + if offset, ok := s.assignedPartitions[partitionKey]; ok { + return int64(offset) + } + } + return InValidOffset +} + +// BoundConsume returns whether it is bondConsume. +func (s *SubInfo) BoundConsume() bool { + return s.boundConsume +} + +// GetSubscribedTime returns the subscribedTime. +func (s *SubInfo) GetSubscribedTime() int64 { + return s.subscribedTime +} + +// GetTopics returns the topics. +func (s *SubInfo) GetTopics() []string { + return s.topics +} + +// GetTopicConds returns the topicConds. +func (s *SubInfo) GetTopicConds() []string { + return s.topicConds +} + +// GetSessionKey returns the sessionKey. +func (s *SubInfo) GetSessionKey() string { + return s.sessionKey +} + +// SelectBig returns whether it is selectBig. +func (s *SubInfo) SelectBig() bool { + return s.selectBig +} + +// GetSourceCount returns the sourceCount. +func (s *SubInfo) GetSourceCount() int32 { + return s.sourceCount +} + +// GetBoundPartInfo returns the boundPartitions. +func (s *SubInfo) GetBoundPartInfo() string { + return s.boundPartitions +} + +// IsNotAllocated returns whether it is not allocated. +func (s *SubInfo) IsNotAllocated() bool { + return s.notAllocated +} + +// GetAuthorizedInfo returns the authInfo. +func (s *SubInfo) GetAuthorizedInfo() *protocol.AuthorizedInfo { + return s.authInfo +} + +func (s *SubInfo) GetMasterCertificateIInfo() *protocol.MasterCertificateInfo { + return s.masterCertificateInfo +} diff --git a/tubemq-client-twins/tubemq-client-go/client/remote.go b/tubemq-client-twins/tubemq-client-go/client/remote.go new file mode 100644 index 0000000..fd60425 --- /dev/null +++ b/tubemq-client-twins/tubemq-client-go/client/remote.go @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package client defines the api and information +// which can be exposed to user. +package client + +import ( + "sync" + + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata" +) + +// RmtDataCache represents the data returned from TubeMQ. +type RmtDataCache struct { + consumerID string + groupName string + underGroupCtrl bool + defFlowCtrlID int64 + groupFlowCtrlID int64 + subscribeInfo []*metadata.SubscribeInfo + rebalanceResults []*metadata.ConsumerEvent + mu sync.Mutex + brokerToPartitions map[*metadata.Node][]*metadata.Partition +} + +// GetUnderGroupCtrl returns the underGroupCtrl. +func (r *RmtDataCache) GetUnderGroupCtrl() bool { + return r.underGroupCtrl +} + +// GetDefFlowCtrlID returns the defFlowCtrlID. +func (r *RmtDataCache) GetDefFlowCtrlID() int64 { + return r.defFlowCtrlID +} + +// GetGroupFlowCtrlID returns the groupFlowCtrlID. +func (r *RmtDataCache) GetGroupFlowCtrlID() int64 { + return r.groupFlowCtrlID +} + +// GetSubscribeInfo returns the subscribeInfo. +func (r *RmtDataCache) GetSubscribeInfo() []*metadata.SubscribeInfo { + return r.subscribeInfo +} + +// PollEventResult polls the first event result from the rebalanceResults. +func (r *RmtDataCache) PollEventResult() *metadata.ConsumerEvent { + r.mu.Lock() + defer r.mu.Unlock() + if len(r.rebalanceResults) > 0 { + event := r.rebalanceResults[0] + r.rebalanceResults = r.rebalanceResults[1:] + return event + } + return nil +} + +// GetPartitionByBroker returns the +func (r *RmtDataCache) GetPartitionByBroker(node *metadata.Node) []*metadata.Partition { + if partitions, ok := r.brokerToPartitions[node]; ok { + return partitions + } + return nil +} diff --git a/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go b/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go index 4a8898f..f805e08 100644 --- a/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go +++ b/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go @@ -49,6 +49,7 @@ const ( ) var serialNo uint32 + func NewSerialNo() uint32 { return atomic.AddUint32(&serialNo, 1) } @@ -151,6 +152,12 @@ type TubeMQRPCRequest struct { Body proto.Message } +func NewRPCRequest() *TubeMQRPCRequest { + return &TubeMQRPCRequest{ + serialNo: NewSerialNo(), + } +} + // GetSerialNo returns the serialNo. func (t *TubeMQRPCRequest) GetSerialNo() uint32 { return t.serialNo @@ -295,4 +302,3 @@ func readDelimitedFrom(data []byte, msg proto.Message) ([]byte, error) { return data[int(size)+n:], nil } - diff --git a/tubemq-client-twins/tubemq-client-go/config/config.go b/tubemq-client-twins/tubemq-client-go/config/config.go new file mode 100644 index 0000000..999d87a --- /dev/null +++ b/tubemq-client-twins/tubemq-client-go/config/config.go @@ -0,0 +1,12 @@ +package config + +import ( + "time" +) + +type Config struct { + Net struct { + // How long to wait for a response. + ReadTimeout time.Duration + } +} diff --git a/tubemq-client-twins/tubemq-client-go/errs/errs.go b/tubemq-client-twins/tubemq-client-go/errs/errs.go new file mode 100644 index 0000000..637e381 --- /dev/null +++ b/tubemq-client-twins/tubemq-client-go/errs/errs.go @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// package errs defines the TubeMQ error codes and TubeMQ error msg. +package errs + +import ( + "fmt" +) + +const ( + RetMarshalFailure = 1 + RetResponseException = 2 + RetUnMarshalFailure = 3 + RetAssertionFailure = 4 +) + +var ErrAssertionFailure = New(RetAssertionFailure, "AssertionFailure") + +// Error provides a TubeMQ-specific error container +type Error struct { + Code int32 + Msg string +} + +func (e *Error) Error() string { + return fmt.Sprintf("code: %d, msg:%s", e.Code, e.Msg) +} + +func New(code int32, msg string) error { + err := &Error{ + Code: code, + Msg: msg, + } + return err +} diff --git a/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go b/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go new file mode 100644 index 0000000..2b4ce49 --- /dev/null +++ b/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metadata + +// ConsumerEvent represents the metadata of a consumer event +type ConsumerEvent struct { + rebalanceID int64 + eventType int32 + eventStatus int32 + subscribeInfo []*SubscribeInfo +} + +// GetRebalanceID returns the rebalanceID of a consumer event. +func (c *ConsumerEvent) GetRebalanceID() int64 { + return c.rebalanceID +} + +// GetEventType returns the event type of a consumer event. +func (c *ConsumerEvent) GetEventType() int32 { + return c.eventType +} + +// GetEventStatus returns the event status of a consumer event. +func (c *ConsumerEvent) GetEventStatus() int32 { + return c.eventStatus +} + +// SetEventType sets the event type. +func (c *ConsumerEvent) SetEventType(eventType int32) { + c.eventType = eventType +} + +// SetEventStatus sets the event status. +func (c *ConsumerEvent) SetEventStatus(eventStatus int32) { + c.eventStatus = eventStatus +} + +// GetSubscribeInfo returns the list of SubscribeInfo of a consumer event. +func (c *ConsumerEvent) GetSubscribeInfo() []*SubscribeInfo { + return c.subscribeInfo +} diff --git a/tubemq-client-twins/tubemq-client-go/metadata/metadata.go b/tubemq-client-twins/tubemq-client-go/metadata/metadata.go new file mode 100644 index 0000000..861a06e --- /dev/null +++ b/tubemq-client-twins/tubemq-client-go/metadata/metadata.go @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// package metadata defines all the metadata of the TubeMQ broker and producer. +package metadata + +// Metadata represents the metadata of +type Metadata struct { + node *Node + subscribeInfo *SubscribeInfo + readStatus int32 + reportTimes bool +} + +// GetNode returns the node of the metadata. +func (m *Metadata) GetNode() *Node { + return m.node +} + +// GetSubscribeInfo returns the SubscribeInfo of the metadata. +func (m *Metadata) GetSubscribeInfo() *SubscribeInfo { + return m.subscribeInfo +} + +// GetReadStatus returns the read status. +func (m *Metadata) GetReadStatus() int32 { + return m.readStatus +} + +// GetReportTimes returns the report times. +func (m *Metadata) GetReportTimes() bool { + return m.reportTimes +} diff --git a/tubemq-client-twins/tubemq-client-go/metadata/node.go b/tubemq-client-twins/tubemq-client-go/metadata/node.go new file mode 100644 index 0000000..625f278 --- /dev/null +++ b/tubemq-client-twins/tubemq-client-go/metadata/node.go @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metadata + +import ( + "strconv" +) + +// Node represents the metadata of a node. +type Node struct { + id uint32 + host string + port uint32 + address string +} + +// GetID returns the id of a node. +func (n *Node) GetID() uint32 { + return n.id +} + +// GetPort returns the port of a node. +func (n *Node) GetPort() uint32 { + return n.port +} + +// GetHost returns the hostname of a node. +func (n *Node) GetHost() string { + return n.host +} + +// GetAddress returns the address of a node. +func (n *Node) GetAddress() string { + return n.address +} + +// String returns the metadata of a node as a string. +func (n *Node) String() string { + return strconv.Itoa(int(n.id)) + ":" + n.host + ":" + strconv.Itoa(int(n.port)) +} diff --git a/tubemq-client-twins/tubemq-client-go/metadata/partition.go b/tubemq-client-twins/tubemq-client-go/metadata/partition.go new file mode 100644 index 0000000..a180214 --- /dev/null +++ b/tubemq-client-twins/tubemq-client-go/metadata/partition.go @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metadata + +import ( + "strconv" +) + +// Partition represents the metadata of a partition. +type Partition struct { + topic string + Broker *Node + partitionID int32 + partitionKey string + offset int64 + lastConsumed bool +} + +// GetLastConsumed returns lastConsumed of a partition. +func (p *Partition) GetLastConsumed() bool { + return p.lastConsumed +} + +// GetPartitionID returns the partition id of a partition. +func (p *Partition) GetPartitionID() int32 { + return p.partitionID +} + +// GetPartitionKey returns the partition key of a partition. +func (p *Partition) GetPartitionKey() string { + return p.partitionKey +} + +// GetTopic returns the topic of the partition subscribed to. +func (p *Partition) GetTopic() string { + return p.topic +} + +// String returns the metadata of a Partition as a string. +func (p *Partition) String() string { + return p.Broker.String() + "#" + p.topic + "@" + strconv.Itoa(int(p.partitionID)) +} diff --git a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go b/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go new file mode 100644 index 0000000..d76437a --- /dev/null +++ b/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metadata + +import ( + "fmt" +) + +// SubscribeInfo represents the metadata of the subscribe info. +type SubscribeInfo struct { + group string + consumerID string + partition *Partition + qryPriorityID int32 +} + +// GetGroup returns the group name. +func (s *SubscribeInfo) GetGroup() string { + return s.group +} + +// GetConsumerID returns the consumer id. +func (s *SubscribeInfo) GetConsumerID() string { + return s.consumerID +} + +// GetPartition returns the partition. +func (s *SubscribeInfo) GetPartition() *Partition { + return s.partition +} + +// GetQryPriorityID returns the QryPriorityID. +func (s *SubscribeInfo) GetQryPriorityID() int32 { + return s.qryPriorityID +} + +// String returns the contents of SubscribeInfo as a string. +func (s *SubscribeInfo) String() string { + return fmt.Sprintf("%s@%s-%s", s.consumerID, s.group, s.partition.String()) +} diff --git a/tubemq-client-twins/tubemq-client-go/rpc/broker.go b/tubemq-client-twins/tubemq-client-go/rpc/broker.go new file mode 100644 index 0000000..aa7b831 --- /dev/null +++ b/tubemq-client-twins/tubemq-client-go/rpc/broker.go @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rpc + +import ( + "github.com/golang/protobuf/proto" + + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol" +) + +const ( + Register = 31 + Unregister = 32 +) + +const ( + BrokerProducerRegister = iota + 11 + BrokerProducerHeartbeat + BrokerProducerSendMsg + BrokerProducerClose + BrokerConsumerRegister + BrokerConsumerHeartbeat + BrokerConsumerGetMsg + BrokerConsumerCommit + BrokerConsumerClose +) + +// RegisterRequestC2B implements the RegisterRequestC2B interface according to TubeMQ RPC protocol. +func (c *rpcClient) RegisterRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) { + reqC2B := &protocol.RegisterRequestC2B{ + OpType: proto.Int32(Register), + ClientId: proto.String(sub.GetClientID()), + GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()), + TopicName: proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()), + PartitionId: proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()), + QryPriorityId: proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()), + ReadStatus: proto.Int32(metadata.GetReadStatus()), + AuthInfo: sub.GetAuthorizedInfo(), + } + if sub.IsFiltered(metadata.GetSubscribeInfo().GetPartition().GetTopic()) { + tfs := sub.GetTopicFilters() + reqC2B.FilterCondStr = make([]string, 0, len(tfs[metadata.GetSubscribeInfo().GetPartition().GetTopic()])) + for _, tf := range tfs[metadata.GetSubscribeInfo().GetPartition().GetTopic()] { + reqC2B.FilterCondStr = append(reqC2B.FilterCondStr, tf) + } + } + offset := sub.GetAssignedPartOffset(metadata.GetSubscribeInfo().GetPartition().GetPartitionKey()) + if offset != client.InValidOffset { + reqC2B.CurrOffset = proto.Int64(offset) + } + data, err := proto.Marshal(reqC2B) + if err != nil { + return nil, errs.New(errs.RetMarshalFailure, err.Error()) + } + req := codec.NewRPCRequest() + req.RpcHeader = &protocol.RpcConnHeader{ + Flag: proto.Int32(0), + } + req.RequestHeader = &protocol.RequestHeader{ + ServiceType: proto.Int32(ReadService), + ProtocolVer: proto.Int32(2), + } + req.RequestBody = &protocol.RequestBody{ + Method: proto.Int32(BrokerConsumerRegister), + Request: data, + Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), + } + rsp, err := c.client.DoRequest(c.ctx, req) + if v, ok := rsp.(*codec.TubeMQRPCResponse); ok { + if v.ResponseException != nil { + return nil, errs.New(errs.RetResponseException, v.ResponseException.String()) + } + rspC2B := &protocol.RegisterResponseB2C{} + err := proto.Unmarshal(v.ResponseBody.Data, rspC2B) + if err != nil { + return nil, errs.New(errs.RetUnMarshalFailure, err.Error()) + } + return rspC2B, nil + } + return nil, errs.ErrAssertionFailure +} + +// UnregisterRequestC2B implements the UnregisterRequestC2B interface according to TubeMQ RPC protocol. +func (c *rpcClient) UnregisterRequestC2B(metadata metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) { + reqC2B := &protocol.RegisterRequestC2B{ + OpType: proto.Int32(Unregister), + ClientId: proto.String(sub.GetClientID()), + GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()), + TopicName: proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()), + PartitionId: proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()), + ReadStatus: proto.Int32(metadata.GetReadStatus()), + AuthInfo: sub.GetAuthorizedInfo(), + } + req := codec.NewRPCRequest() + req.RpcHeader = &protocol.RpcConnHeader{ + Flag: proto.Int32(0), + } + data, err := proto.Marshal(reqC2B) + if err != nil { + return nil, errs.New(errs.RetMarshalFailure, err.Error()) + } + req.RequestHeader = &protocol.RequestHeader{ + ServiceType: proto.Int32(ReadService), + ProtocolVer: proto.Int32(2), + } + req.RequestBody = &protocol.RequestBody{ + Method: proto.Int32(BrokerConsumerRegister), + Request: data, + Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), + } + rsp, err := c.client.DoRequest(c.ctx, req) + if v, ok := rsp.(*codec.TubeMQRPCResponse); ok { + if v.ResponseException != nil { + return nil, errs.New(errs.RetResponseException, v.ResponseException.String()) + } + rspC2B := &protocol.RegisterResponseB2C{} + err := proto.Unmarshal(v.ResponseBody.Data, rspC2B) + if err != nil { + return nil, errs.New(errs.RetUnMarshalFailure, err.Error()) + } + return rspC2B, nil + } + return nil, errs.ErrAssertionFailure +} + +// GetMessageRequestC2B implements the GetMessageRequestC2B interface according to TubeMQ RPC protocol. +func (c *rpcClient) GetMessageRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error) { + reqC2B := &protocol.GetMessageRequestC2B{ + ClientId: proto.String(sub.GetClientID()), + PartitionId: proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()), + GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()), + TopicName: proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()), + EscFlowCtrl: proto.Bool(r.GetUnderGroupCtrl()), + LastPackConsumed: proto.Bool(metadata.GetSubscribeInfo().GetPartition().GetLastConsumed()), + ManualCommitOffset: proto.Bool(false), + } + req := codec.NewRPCRequest() + req.RpcHeader = &protocol.RpcConnHeader{ + Flag: proto.Int32(0), + } + data, err := proto.Marshal(reqC2B) + if err != nil { + return nil, errs.New(errs.RetMarshalFailure, err.Error()) + } + req.RequestHeader = &protocol.RequestHeader{ + ServiceType: proto.Int32(ReadService), + ProtocolVer: proto.Int32(2), + } + req.RequestBody = &protocol.RequestBody{ + Method: proto.Int32(BrokerConsumerGetMsg), + Request: data, + Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), + } + rsp, err := c.client.DoRequest(c.ctx, req) + if v, ok := rsp.(*codec.TubeMQRPCResponse); ok { + if v.ResponseException != nil { + return nil, errs.New(errs.RetResponseException, v.ResponseException.String()) + } + rspC2B := &protocol.GetMessageResponseB2C{} + err := proto.Unmarshal(v.ResponseBody.Data, rspC2B) + if err != nil { + return nil, errs.New(errs.RetUnMarshalFailure, err.Error()) + } + return rspC2B, nil + } + return nil, errs.ErrAssertionFailure +} + +// CommitOffsetRequestC2B implements the CommitOffsetRequestC2B interface according to TubeMQ RPC protocol. +func (c *rpcClient) CommitOffsetRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, error) { + reqC2B := &protocol.CommitOffsetRequestC2B{ + ClientId: proto.String(sub.GetClientID()), + TopicName: proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()), + PartitionId: proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()), + GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()), + LastPackConsumed: proto.Bool(metadata.GetSubscribeInfo().GetPartition().GetLastConsumed()), + } + req := codec.NewRPCRequest() + req.RpcHeader = &protocol.RpcConnHeader{ + Flag: proto.Int32(10), + } + data, err := proto.Marshal(reqC2B) + if err != nil { + return nil, errs.New(errs.RetMarshalFailure, err.Error()) + } + req.RequestHeader = &protocol.RequestHeader{ + ServiceType: proto.Int32(ReadService), + ProtocolVer: proto.Int32(2), + } + req.RequestBody = &protocol.RequestBody{ + Method: proto.Int32(BrokerConsumerHeartbeat), + Request: data, + Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), + } + rsp, err := c.client.DoRequest(c.ctx, req) + if v, ok := rsp.(*codec.TubeMQRPCResponse); ok { + if v.ResponseException != nil { + return nil, errs.New(errs.RetResponseException, v.ResponseException.String()) + } + rspC2B := &protocol.CommitOffsetResponseB2C{} + err := proto.Unmarshal(v.ResponseBody.Data, rspC2B) + if err != nil { + return nil, errs.New(errs.RetUnMarshalFailure, err.Error()) + } + return rspC2B, nil + } + return nil, errs.ErrAssertionFailure +} + +// HeartbeatRequestC2B implements the HeartbeatRequestC2B interface according to TubeMQ RPC protocol. +func (c *rpcClient) HeartbeatRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error) { + reqC2B := &protocol.HeartBeatRequestC2B{ + ClientId: proto.String(sub.GetClientID()), + GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()), + ReadStatus: proto.Int32(metadata.GetReadStatus()), + QryPriorityId: proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()), + AuthInfo: sub.GetAuthorizedInfo(), + } + partitions := r.GetPartitionByBroker(metadata.GetNode()) + reqC2B.PartitionInfo = make([]string, 0, len(partitions)) + for _, partition := range partitions { + reqC2B.PartitionInfo = append(reqC2B.PartitionInfo, partition.String()) + } + data, err := proto.Marshal(reqC2B) + if err != nil { + return nil, errs.New(errs.RetMarshalFailure, err.Error()) + } + req := codec.NewRPCRequest() + req.RequestHeader = &protocol.RequestHeader{ + ServiceType: proto.Int32(ReadService), + ProtocolVer: proto.Int32(2), + } + req.RequestBody = &protocol.RequestBody{ + Method: proto.Int32(BrokerConsumerHeartbeat), + Request: data, + Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), + } + req.RpcHeader = &protocol.RpcConnHeader{ + Flag: proto.Int32(0), + } + rsp, err := c.client.DoRequest(c.ctx, req) + if v, ok := rsp.(*codec.TubeMQRPCResponse); ok { + if v.ResponseException != nil { + return nil, errs.New(errs.RetResponseException, v.ResponseException.String()) + } + rspC2B := &protocol.HeartBeatResponseB2C{} + err := proto.Unmarshal(v.ResponseBody.Data, rspC2B) + if err != nil { + return nil, errs.New(errs.RetUnMarshalFailure, err.Error()) + } + return rspC2B, nil + } + return nil, errs.ErrAssertionFailure +} diff --git a/tubemq-client-twins/tubemq-client-go/rpc/client.go b/tubemq-client-twins/tubemq-client-go/rpc/client.go new file mode 100644 index 0000000..79a446e --- /dev/null +++ b/tubemq-client-twins/tubemq-client-go/rpc/client.go @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package rpc encapsulates all the rpc request to TubeMQ. +package rpc + +import ( + "context" + + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport" +) + +const ( + ReadService = 2 + AdminService = 4 +) + +// RPCClient is the rpc level client to interact with TubeMQ. +type RPCClient interface { + // RegisterRequestC2B is the rpc request for a consumer to register to a broker. + RegisterRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) + // UnregisterRequestC2B is the rpc request for a consumer to unregister to a broker. + UnregisterRequestC2B(metadata metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) + // GetMessageRequestC2B is the rpc request for a consumer to get message from a broker. + GetMessageRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error) + // CommitOffsetRequestC2B is the rpc request for a consumer to commit offset to a broker. + CommitOffsetRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, error) + // HeartbeatRequestC2B is the rpc request for a consumer to send heartbeat to a broker. + HeartbeatRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error) + // RegisterRequestC2M is the rpc request for a consumer to register request to master. + RegisterRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, error) + // HeartRequestC2M is the rpc request for a consumer to send heartbeat to master. + HeartRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error) + // CloseRequestC2M is the rpc request for a consumer to be closed to master. + CloseRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CloseResponseM2C, error) +} + +type rpcClient struct { + pool *multiplexing.Pool + client *transport.Client + ctx context.Context + config *config.Config +} + +// New returns a default TubeMQ rpc Client +func New(pool *multiplexing.Pool, opts *transport.Options, ctx context.Context, config *config.Config) RPCClient { + return &rpcClient{ + pool: pool, + client: transport.New(opts, pool), + ctx: ctx, + config: config, + } +} diff --git a/tubemq-client-twins/tubemq-client-go/rpc/master.go b/tubemq-client-twins/tubemq-client-go/rpc/master.go new file mode 100644 index 0000000..4f9e7f9 --- /dev/null +++ b/tubemq-client-twins/tubemq-client-go/rpc/master.go @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rpc + +import ( + "github.com/golang/protobuf/proto" + + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol" +) + +const ( + MasterProducerRegister = iota + 1 + MasterProducerHeartbeat + MasterProducerClose + MasterConsumerRegister + MasterConsumerHeartbeat + MasterConsumerClose +) + +// RegisterRequestRequestC2M implements the RegisterRequestRequestC2M interface according to TubeMQ RPC protocol. +func (c *rpcClient) RegisterRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, error) { + reqC2M := &protocol.RegisterRequestC2M{ + ClientId: proto.String(sub.GetClientID()), + HostName: proto.String(metadata.GetNode().GetHost()), + GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()), + RequireBound: proto.Bool(sub.BoundConsume()), + SessionTime: proto.Int64(sub.GetSubscribedTime()), + DefFlowCheckId: proto.Int64(r.GetDefFlowCtrlID()), + GroupFlowCheckId: proto.Int64(r.GetGroupFlowCtrlID()), + QryPriorityId: proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()), + AuthInfo: sub.GetMasterCertificateIInfo(), + } + reqC2M.TopicList = make([]string, 0, len(sub.GetTopics())) + reqC2M.TopicList = append(reqC2M.TopicList, sub.GetTopics()...) + + reqC2M.SubscribeInfo = make([]string, 0, len(r.GetSubscribeInfo())) + for _, s := range r.GetSubscribeInfo() { + reqC2M.SubscribeInfo = append(reqC2M.SubscribeInfo, s.String()) + } + + reqC2M.TopicCondition = make([]string, 0, len(sub.GetTopicConds())) + reqC2M.TopicCondition = append(reqC2M.TopicCondition, sub.GetTopicConds()...) + + if sub.BoundConsume() { + reqC2M.SessionKey = proto.String(sub.GetSessionKey()) + reqC2M.SelectBig = proto.Bool(sub.SelectBig()) + reqC2M.TotalCount = proto.Int32(sub.GetSourceCount()) + reqC2M.RequiredPartition = proto.String(sub.GetBoundPartInfo()) + reqC2M.NotAllocated = proto.Bool(sub.IsNotAllocated()) + } + + reqC2M.AuthInfo = &protocol.MasterCertificateInfo{ + AuthInfo: &protocol.AuthenticateInfo{}, + } + + data, err := proto.Marshal(reqC2M) + if err != nil { + return nil, errs.New(errs.RetMarshalFailure, err.Error()) + } + req := codec.NewRPCRequest() + req.RpcHeader = &protocol.RpcConnHeader{ + Flag: proto.Int32(0), + } + req.RequestHeader = &protocol.RequestHeader{ + ServiceType: proto.Int32(AdminService), + ProtocolVer: proto.Int32(2), + } + req.RequestBody = &protocol.RequestBody{ + Method: proto.Int32(MasterConsumerRegister), + Request: data, + Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), + } + rsp, err := c.client.DoRequest(c.ctx, req) + if v, ok := rsp.(*codec.TubeMQRPCResponse); ok { + if v.ResponseException != nil { + return nil, errs.New(errs.RetResponseException, err.Error()) + } + rspM2C := &protocol.RegisterResponseM2C{} + err := proto.Unmarshal(v.ResponseBody.Data, rspM2C) + if err != nil { + return nil, errs.New(errs.RetUnMarshalFailure, err.Error()) + } + return rspM2C, nil + } + return nil, errs.ErrAssertionFailure +} + +// HeartRequestC2M implements the HeartRequestC2M interface according to TubeMQ RPC protocol. +func (c *rpcClient) HeartRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error) { + reqC2M := &protocol.HeartRequestC2M{ + ClientId: proto.String(sub.GetClientID()), + GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()), + DefFlowCheckId: proto.Int64(r.GetDefFlowCtrlID()), + GroupFlowCheckId: proto.Int64(r.GetGroupFlowCtrlID()), + ReportSubscribeInfo: proto.Bool(false), + QryPriorityId: proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()), + } + event := r.PollEventResult() + if event != nil || metadata.GetReportTimes() { + reqC2M.ReportSubscribeInfo = proto.Bool(true) + subscribeInfo := r.GetSubscribeInfo() + if len(subscribeInfo) > 0 { + reqC2M.SubscribeInfo = make([]string, 0, len(subscribeInfo)) + for _, s := range subscribeInfo { + reqC2M.SubscribeInfo = append(reqC2M.SubscribeInfo, s.String()) + } + } + } + if event != nil { + ep := &protocol.EventProto{ + RebalanceId: proto.Int64(event.GetRebalanceID()), + OpType: proto.Int32(event.GetEventType()), + Status: proto.Int32(event.GetEventStatus()), + } + si := event.GetSubscribeInfo() + ep.SubscribeInfo = make([]string, 0, len(si)) + for _, s := range si { + ep.SubscribeInfo = append(ep.SubscribeInfo, s.String()) + } + } + reqC2M.AuthInfo = &protocol.MasterCertificateInfo{ + AuthInfo: &protocol.AuthenticateInfo{}, + } + data, err := proto.Marshal(reqC2M) + if err != nil { + return nil, errs.New(errs.RetMarshalFailure, err.Error()) + } + req := codec.NewRPCRequest() + req.RequestHeader = &protocol.RequestHeader{ + ServiceType: proto.Int32(AdminService), + ProtocolVer: proto.Int32(2), + } + req.RequestBody = &protocol.RequestBody{ + Method: proto.Int32(MasterConsumerHeartbeat), + Request: data, + Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), + } + req.RpcHeader = &protocol.RpcConnHeader{ + Flag: proto.Int32(0), + } + rsp, err := c.client.DoRequest(c.ctx, req) + if v, ok := rsp.(*codec.TubeMQRPCResponse); ok { + if v.ResponseException != nil { + return nil, errs.New(errs.RetResponseException, err.Error()) + } + rspM2C := &protocol.HeartResponseM2C{} + err := proto.Unmarshal(v.ResponseBody.Data, rspM2C) + if err != nil { + return nil, errs.New(errs.RetUnMarshalFailure, err.Error()) + } + return rspM2C, nil + } + return nil, errs.ErrAssertionFailure +} + +// CloseRequestC2M implements the CloseRequestC2M interface according to TubeMQ RPC protocol. +func (c *rpcClient) CloseRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CloseResponseM2C, error) { + reqC2M := &protocol.CloseRequestC2M{ + ClientId: proto.String(sub.GetClientID()), + GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()), + AuthInfo: sub.GetMasterCertificateIInfo(), + } + data, err := proto.Marshal(reqC2M) + if err != nil { + return nil, errs.New(errs.RetMarshalFailure, err.Error()) + } + req := codec.NewRPCRequest() + req.RequestHeader = &protocol.RequestHeader{ + ServiceType: proto.Int32(AdminService), + ProtocolVer: proto.Int32(2), + } + req.RequestBody = &protocol.RequestBody{ + Method: proto.Int32(MasterConsumerClose), + Request: data, + Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), + } + req.RpcHeader = &protocol.RpcConnHeader{ + Flag: proto.Int32(0), + } + rsp, err := c.client.DoRequest(c.ctx, req) + if v, ok := rsp.(*codec.TubeMQRPCResponse); ok { + if v.ResponseException != nil { + return nil, errs.New(errs.RetResponseException, err.Error()) + } + rspM2C := &protocol.CloseResponseM2C{} + err := proto.Unmarshal(v.ResponseBody.Data, rspM2C) + if err != nil { + return nil, errs.New(errs.RetUnMarshalFailure, err.Error()) + } + return rspM2C, nil + } + return nil, errs.ErrAssertionFailure +}
