This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch INLONG-25 in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 0d4ed41f647eb5155c936f10f0f61113c44161b3 Author: Zijie Lu <[email protected]> AuthorDate: Thu Jun 3 16:02:09 2021 +0800 [INLONG-624]Go SDK consumer interface Signed-off-by: Zijie Lu <[email protected]> --- .../{metadata/node.go => client/consumer.go} | 51 ++- .../tubemq-client-go/client/consumer_impl.go | 359 +++++++++++++++++++++ .../tubemq-client-go/client/heartbeat.go | 229 +++++++++++++ .../tubemq-client-go/client/remote.go | 79 ----- .../tubemq-client-go/config/config.go | 18 +- .../tubemq-client-go/config/config_test.go | 10 +- tubemq-client-twins/tubemq-client-go/errs/errs.go | 12 +- .../tubemq-client-go/metadata/consumer_event.go | 8 + .../tubemq-client-go/metadata/metadata.go | 20 ++ .../tubemq-client-go/metadata/node.go | 54 ++++ .../tubemq-client-go/metadata/partition.go | 42 ++- .../tubemq-client-go/metadata/subcribe_info.go | 54 +++- .../tubemq-client-go/remote/remote.go | 331 +++++++++++++++++++ tubemq-client-twins/tubemq-client-go/rpc/broker.go | 32 +- tubemq-client-twins/tubemq-client-go/rpc/client.go | 23 +- tubemq-client-twins/tubemq-client-go/rpc/master.go | 19 +- .../tubemq-client-go/{client => sub}/info.go | 77 ++++- .../tubemq-client-go/transport/client.go | 8 +- tubemq-client-twins/tubemq-client-go/util/util.go | 52 +++ 19 files changed, 1299 insertions(+), 179 deletions(-) diff --git a/tubemq-client-twins/tubemq-client-go/metadata/node.go b/tubemq-client-twins/tubemq-client-go/client/consumer.go similarity index 52% copy from tubemq-client-twins/tubemq-client-go/metadata/node.go copy to tubemq-client-twins/tubemq-client-go/client/consumer.go index 625f278..27a8536 100644 --- a/tubemq-client-twins/tubemq-client-go/metadata/node.go +++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go @@ -15,41 +15,32 @@ * limitations under the License. */ -package metadata +// Package client defines the api and information +// which can be exposed to user. +package client -import ( - "strconv" +const ( + tubeMQClientVersion = "0.1.0" ) -// Node represents the metadata of a node. -type Node struct { - id uint32 - host string - port uint32 - address string +// ConsumerResult of a consumption. +type ConsumerResult struct { } -// GetID returns the id of a node. -func (n *Node) GetID() uint32 { - return n.id +// ConsumerOffset of a consumption, +type ConsumerOffset struct { } -// GetPort returns the port of a node. -func (n *Node) GetPort() uint32 { - return n.port -} - -// GetHost returns the hostname of a node. -func (n *Node) GetHost() string { - return n.host -} - -// GetAddress returns the address of a node. -func (n *Node) GetAddress() string { - return n.address -} - -// String returns the metadata of a node as a string. -func (n *Node) String() string { - return strconv.Itoa(int(n.id)) + ":" + n.host + ":" + strconv.Itoa(int(n.port)) +var clientIndex uint64 + +// Consumer is an interface that abstracts behavior of TubeMQ's consumer +type Consumer interface { + // Start starts the consumer. + Start() error + // GetMessage receive a single message. + GetMessage() (*ConsumerResult, error) + // Confirm the consumption of a message. + Confirm(confirmContext string, consumed bool) (*ConsumerResult, error) + // GetCurrConsumedInfo returns the consumptions of the consumer. + GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) } diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go new file mode 100644 index 0000000..c0f7e9a --- /dev/null +++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go @@ -0,0 +1,359 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client + +import ( + "context" + "os" + "strconv" + "sync/atomic" + "time" + + "github.com/golang/protobuf/proto" + + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/rpc" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/selector" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util" +) + +const ( + consumeStatusNormal = 0 + consumeStatusFromMax = 1 + consumeStatusFromMaxAlways = 2 +) + +type consumer struct { + clientID string + config *config.Config + subInfo *sub.SubInfo + rmtDataCache *remote.RmtDataCache + visitToken int64 + authorizedInfo string + nextAuth2Master int32 + nextAuth2Broker int32 + master *selector.Node + client rpc.RPCClient + selector selector.Selector + lastMasterHb int64 + masterHBRetry int + heartbeatManager *heartbeatManager + unreportedTimes int +} + +// NewConsumer returns a consumer which is constructed by a given config. +func NewConsumer(config *config.Config) (Consumer, error) { + selector, err := selector.Get("ip") + if err != nil { + return nil, err + } + + clientID := newClientID(config.Consumer.Group) + pool := multiplexing.NewPool() + opts := &transport.Options{} + if config.Net.TLS.Enable { + opts.CACertFile = config.Net.TLS.CACertFile + opts.TLSCertFile = config.Net.TLS.TLSCertFile + opts.TLSKeyFile = config.Net.TLS.TLSKeyFile + opts.TLSServerName = config.Net.TLS.TLSServerName + } + client := rpc.New(pool, opts) + r := remote.NewRmtDataCache() + r.SetConsumerInfo(clientID, config.Consumer.Group) + c := &consumer{ + config: config, + clientID: clientID, + subInfo: sub.NewSubInfo(config), + rmtDataCache: r, + selector: selector, + client: client, + visitToken: util.InvalidValue, + unreportedTimes: 0, + } + c.subInfo.SetClientID(clientID) + hbm := newHBManager(c) + c.heartbeatManager = hbm + return c, nil +} + +// Start implementation of tubeMQ consumer. +func (c *consumer) Start() error { + err := c.register2Master(false) + if err != nil { + return err + } + c.heartbeatManager.registerMaster(c.master.Address) + go c.processRebalanceEvent() + return nil +} + +func (c *consumer) register2Master(needChange bool) error { + if needChange { + node, err := c.selector.Select(c.config.Consumer.Masters) + if err != nil { + return err + } + c.master = node + } + for c.master.HasNext { + ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) + + m := &metadata.Metadata{} + node := &metadata.Node{} + node.SetHost(util.GetLocalHost()) + node.SetAddress(c.master.Address) + m.SetNode(node) + sub := &metadata.SubscribeInfo{} + sub.SetGroup(c.config.Consumer.Group) + m.SetSubscribeInfo(sub) + + auth := &protocol.AuthenticateInfo{} + c.genMasterAuthenticateToken(auth, true) + mci := &protocol.MasterCertificateInfo{ + AuthInfo: auth, + } + c.subInfo.SetMasterCertificateInfo(mci) + + rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, c.rmtDataCache) + if err != nil { + cancel() + return err + } + if rsp.GetSuccess() { + c.masterHBRetry = 0 + c.processRegisterResponseM2C(rsp) + cancel() + return nil + } else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden { + cancel() + return nil + } else { + c.master, err = c.selector.Select(c.config.Consumer.Masters) + cancel() + if err != nil { + return err + } + } + } + return nil +} + +func (c *consumer) processRegisterResponseM2C(rsp *protocol.RegisterResponseM2C) { + if rsp.GetNotAllocated() { + c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated()) + } + if rsp.GetDefFlowCheckId() != 0 || rsp.GetDefFlowCheckId() != 0 { + if rsp.GetDefFlowCheckId() != 0 { + c.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo()) + } + qryPriorityID := c.rmtDataCache.GetQryPriorityID() + if rsp.GetQryPriorityId() != 0 { + qryPriorityID = rsp.GetQryPriorityId() + } + c.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo()) + } + if rsp.GetAuthorizedInfo() != nil { + c.processAuthorizedToken(rsp.GetAuthorizedInfo()) + } + c.lastMasterHb = time.Now().UnixNano() / int64(time.Millisecond) +} + +func (c *consumer) processAuthorizedToken(info *protocol.MasterAuthorizedInfo) { + atomic.StoreInt64(&c.visitToken, info.GetVisitAuthorizedToken()) + c.authorizedInfo = info.GetAuthAuthorizedToken() +} + +// GetMessage implementation of TubeMQ consumer. +func (c *consumer) GetMessage() (*ConsumerResult, error) { + panic("implement me") +} + +// Confirm implementation of TubeMQ consumer. +func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConsumerResult, error) { + panic("implement me") +} + +// GetCurrConsumedInfo implementation of TubeMQ consumer. +func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) { + panic("implement me") +} + +func (c *consumer) processRebalanceEvent() { + for { + event := c.rmtDataCache.TakeEvent() + if event == nil { + continue + } + if event.GetEventStatus() == int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue { + break + } + c.rmtDataCache.ClearEvent() + switch event.GetEventType() { + case 2, 20: + c.disconnect2Broker(event) + c.rmtDataCache.OfferEvent(event) + case 1, 10: + c.connect2Broker(event) + c.rmtDataCache.OfferEvent(event) + } + } +} + +func (c *consumer) disconnect2Broker(event *metadata.ConsumerEvent) { + subscribeInfo := event.GetSubscribeInfo() + if len(subscribeInfo) > 0 { + removedPartitions := make(map[*metadata.Node][]*metadata.Partition) + c.rmtDataCache.RemoveAndGetPartition(subscribeInfo, c.config.Consumer.RollbackIfConfirmTimeout, removedPartitions) + if len(removedPartitions) > 0 { + c.unregister2Broker(removedPartitions) + } + } + event.SetEventStatus(2) +} + +func (c *consumer) unregister2Broker(unRegPartitions map[*metadata.Node][]*metadata.Partition) { + if len(unRegPartitions) == 0 { + return + } + + for _, partitions := range unRegPartitions { + for _, partition := range partitions { + ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) + + m := &metadata.Metadata{} + node := &metadata.Node{} + node.SetHost(util.GetLocalHost()) + node.SetAddress(partition.GetBroker().GetAddress()) + m.SetNode(node) + m.SetReadStatus(1) + sub := &metadata.SubscribeInfo{} + sub.SetGroup(c.config.Consumer.Group) + sub.SetConsumerID(c.clientID) + sub.SetPartition(partition) + m.SetSubscribeInfo(sub) + + c.client.UnregisterRequestC2B(ctx, m, c.subInfo) + cancel() + } + } +} + +func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) { + if len(event.GetSubscribeInfo()) > 0 { + unsubPartitions := c.rmtDataCache.FilterPartitions(event.GetSubscribeInfo()) + if len(unsubPartitions) > 0 { + for _, partition := range unsubPartitions { + m := &metadata.Metadata{} + node := &metadata.Node{} + node.SetHost(util.GetLocalHost()) + node.SetAddress(partition.GetBroker().GetAddress()) + m.SetNode(node) + sub := &metadata.SubscribeInfo{} + sub.SetGroup(c.config.Consumer.Group) + sub.SetConsumerID(c.clientID) + sub.SetPartition(partition) + m.SetSubscribeInfo(sub) + isFirstRegister := c.rmtDataCache.IsFirstRegister(partition.GetPartitionKey()) + m.SetReadStatus(c.getConsumeReadStatus(isFirstRegister)) + auth := c.genBrokerAuthenticInfo(true) + c.subInfo.SetAuthorizedInfo(auth) + + ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) + rsp, err := c.client.RegisterRequestC2B(ctx, m, c.subInfo, c.rmtDataCache) + if err != nil { + //todo add log + } + if rsp.GetSuccess() { + c.rmtDataCache.AddNewPartition(partition) + c.heartbeatManager.registerBroker(node) + } + cancel() + } + } + } + c.subInfo.FirstRegistered() + event.SetEventStatus(2) +} + +func newClientIndex() uint64 { + return atomic.AddUint64(&clientIndex, 1) +} + +func newClientID(group string) string { + return group + "_" + + util.GetLocalHost() + "_" + + strconv.Itoa(os.Getpid()) + "_" + + strconv.Itoa(int(time.Now().Unix()*1000)) + "_" + + strconv.Itoa(int(newClientIndex())) + "_" + + tubeMQClientVersion +} + +func (c *consumer) genBrokerAuthenticInfo(force bool) *protocol.AuthorizedInfo { + needAdd := false + auth := &protocol.AuthorizedInfo{} + if c.config.Net.Auth.Enable { + if force { + needAdd = true + atomic.StoreInt32(&c.nextAuth2Broker, 0) + } else if atomic.LoadInt32(&c.nextAuth2Broker) == 1 { + if atomic.CompareAndSwapInt32(&c.nextAuth2Broker, 1, 0) { + needAdd = true + } + } + if needAdd { + authToken := util.GenBrokerAuthenticateToken(c.config.Net.Auth.UserName, c.config.Net.Auth.Password) + auth.AuthAuthorizedToken = proto.String(authToken) + } + } + return auth +} + +func (c *consumer) genMasterAuthenticateToken(auth *protocol.AuthenticateInfo, force bool) { + needAdd := false + if c.config.Net.Auth.Enable { + if force { + needAdd = true + atomic.StoreInt32(&c.nextAuth2Master, 0) + } else if atomic.LoadInt32(&c.nextAuth2Master) == 1 { + if atomic.CompareAndSwapInt32(&c.nextAuth2Master, 1, 0) { + needAdd = true + } + } + if needAdd { + } + } +} + +func (c *consumer) getConsumeReadStatus(isFirstReg bool) int32 { + readStatus := consumeStatusNormal + if isFirstReg { + if c.config.Consumer.ConsumePosition == 0 { + readStatus = consumeStatusFromMax + } else if c.config.Consumer.ConsumePosition > 0 { + readStatus = consumeStatusFromMaxAlways + } + } + return int32(readStatus) +} diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go new file mode 100644 index 0000000..9c29854 --- /dev/null +++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client + +import ( + "context" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util" +) + +type heartbeatMetadata struct { + numConnections int + timer *time.Timer +} + +type heartbeatManager struct { + consumer *consumer + heartbeats map[string]*heartbeatMetadata + mu sync.Mutex +} + +func newHBManager(consumer *consumer) *heartbeatManager { + return &heartbeatManager{ + consumer: consumer, + heartbeats: make(map[string]*heartbeatMetadata), + } +} + +func (h *heartbeatManager) registerMaster(address string) { + h.mu.Lock() + defer h.mu.Unlock() + if _, ok := h.heartbeats[address]; !ok { + h.heartbeats[address] = &heartbeatMetadata{ + numConnections: 1, + timer: time.AfterFunc(h.consumer.config.Heartbeat.Interval/2, h.consumerHB2Master), + } + } + hm := h.heartbeats[address] + hm.numConnections++ +} + +func (h *heartbeatManager) registerBroker(broker *metadata.Node) { + h.mu.Lock() + defer h.mu.Unlock() + + if _, ok := h.heartbeats[broker.GetAddress()]; !ok { + h.heartbeats[broker.GetAddress()] = &heartbeatMetadata{ + numConnections: 1, + timer: time.AfterFunc(h.consumer.config.Heartbeat.Interval, func() { h.consumerHB2Broker(broker) }), + } + } + hm := h.heartbeats[broker.GetAddress()] + hm.numConnections++ +} + +func (h *heartbeatManager) consumerHB2Master() { + if time.Now().UnixNano()/int64(time.Millisecond)-h.consumer.lastMasterHb > 30000 { + h.consumer.rmtDataCache.HandleExpiredPartitions(h.consumer.config.Consumer.MaxConfirmWait) + } + m := &metadata.Metadata{} + node := &metadata.Node{} + node.SetHost(util.GetLocalHost()) + node.SetAddress(h.consumer.master.Address) + m.SetNode(node) + sub := &metadata.SubscribeInfo{} + sub.SetGroup(h.consumer.config.Consumer.Group) + m.SetSubscribeInfo(sub) + h.consumer.unreportedTimes++ + if h.consumer.unreportedTimes > h.consumer.config.Consumer.MaxSubInfoReportInterval { + m.SetReportTimes(true) + h.consumer.unreportedTimes = 0 + } + + retry := 0 + for retry < h.consumer.config.Heartbeat.MaxRetryTimes { + ctx, cancel := context.WithTimeout(context.Background(), h.consumer.config.Net.ReadTimeout) + rsp, err := h.consumer.client.HeartRequestC2M(ctx, m, h.consumer.subInfo, h.consumer.rmtDataCache) + if err != nil { + cancel() + } + if rsp.GetSuccess() { + cancel() + h.processHBResponseM2C(rsp) + break + } else if rsp.GetErrCode() == errs.RetErrHBNoNode || strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 { + cancel() + h.consumer.masterHBRetry++ + address := h.consumer.master.Address + go h.consumer.register2Master(rsp.GetErrCode() != errs.RetErrHBNoNode) + if rsp.GetErrCode() != errs.RetErrHBNoNode { + hm := h.heartbeats[address] + hm.numConnections-- + if hm.numConnections == 0 { + h.mu.Lock() + delete(h.heartbeats, address) + h.mu.Unlock() + } + } + return + } + cancel() + } + h.mu.Lock() + defer h.mu.Unlock() + hm := h.heartbeats[h.consumer.master.Address] + hm.timer.Reset(h.nextHeartbeatInterval()) +} + +func (h *heartbeatManager) processHBResponseM2C(rsp *protocol.HeartResponseM2C) { + h.consumer.masterHBRetry = 0 + h.consumer.subInfo.SetIsNotAllocated(rsp.GetNotAllocated()) + if rsp.GetDefFlowCheckId() != 0 || rsp.GetGroupFlowCheckId() != 0 { + if rsp.GetDefFlowCheckId() != 0 { + h.consumer.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo()) + } + qryPriorityID := h.consumer.rmtDataCache.GetQryPriorityID() + if rsp.GetQryPriorityId() != 0 { + qryPriorityID = rsp.GetQryPriorityId() + } + h.consumer.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo()) + } + if rsp.GetAuthorizedInfo() != nil { + h.consumer.processAuthorizedToken(rsp.GetAuthorizedInfo()) + } + if rsp.GetRequireAuth() { + atomic.StoreInt32(&h.consumer.nextAuth2Master, 1) + } + if rsp.GetEvent() != nil { + event := rsp.GetEvent() + subscribeInfo := make([]*metadata.SubscribeInfo, 0, len(event.GetSubscribeInfo())) + for _, sub := range event.GetSubscribeInfo() { + s, err := metadata.NewSubscribeInfo(sub) + if err != nil { + continue + } + subscribeInfo = append(subscribeInfo, s) + } + e := metadata.NewEvent(event.GetRebalanceId(), event.GetOpType(), subscribeInfo) + h.consumer.rmtDataCache.OfferEvent(e) + } +} + +func (h *heartbeatManager) nextHeartbeatInterval() time.Duration { + interval := h.consumer.config.Heartbeat.Interval + if h.consumer.masterHBRetry >= h.consumer.config.Heartbeat.MaxRetryTimes { + interval = h.consumer.config.Heartbeat.AfterFail + } + return interval +} + +func (h *heartbeatManager) consumerHB2Broker(broker *metadata.Node) { + h.mu.Lock() + defer h.mu.Unlock() + + partitions := h.consumer.rmtDataCache.GetPartitionByBroker(broker) + if len(partitions) == 0 { + h.resetBrokerTimer(broker) + return + } + m := &metadata.Metadata{} + m.SetReadStatus(h.consumer.getConsumeReadStatus(false)) + m.SetNode(broker) + ctx, cancel := context.WithTimeout(context.Background(), h.consumer.config.Net.ReadTimeout) + defer cancel() + + rsp, err := h.consumer.client.HeartbeatRequestC2B(ctx, m, h.consumer.subInfo, h.consumer.rmtDataCache) + if err != nil { + return + } + if rsp.GetSuccess() { + if rsp.GetHasPartFailure() { + partitionKeys := make([]string, 0, len(rsp.GetFailureInfo())) + for _, fi := range rsp.GetFailureInfo() { + pos := strings.Index(fi, ":") + if pos == -1 { + continue + } + partition, err := metadata.NewPartition(fi[pos+1:]) + if err != nil { + continue + } + partitionKeys = append(partitionKeys, partition.GetPartitionKey()) + } + h.consumer.rmtDataCache.RemovePartition(partitionKeys) + } else { + if rsp.GetErrCode() == errs.RetCertificateFailure { + partitionKeys := make([]string, 0, len(partitions)) + for _, partition := range partitions { + partitionKeys = append(partitionKeys, partition.GetPartitionKey()) + } + h.consumer.rmtDataCache.RemovePartition(partitionKeys) + } + } + } + h.resetBrokerTimer(broker) +} + +func (h *heartbeatManager) resetBrokerTimer(broker *metadata.Node) { + interval := h.consumer.config.Heartbeat.Interval + partitions := h.consumer.rmtDataCache.GetPartitionByBroker(broker) + if len(partitions) == 0 { + delete(h.heartbeats, broker.GetAddress()) + } else { + hm := h.heartbeats[broker.GetAddress()] + hm.timer.Reset(interval) + } +} diff --git a/tubemq-client-twins/tubemq-client-go/client/remote.go b/tubemq-client-twins/tubemq-client-go/client/remote.go deleted file mode 100644 index e36b69f..0000000 --- a/tubemq-client-twins/tubemq-client-go/client/remote.go +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// Package client defines the api and information -// which can be exposed to user. -package client - -import ( - "sync" - - "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata" -) - -// RmtDataCache represents the data returned from TubeMQ. -type RmtDataCache struct { - consumerID string - groupName string - underGroupCtrl bool - defFlowCtrlID int64 - groupFlowCtrlID int64 - subscribeInfo []*metadata.SubscribeInfo - rebalanceResults []*metadata.ConsumerEvent - mu sync.Mutex - brokerToPartitions map[*metadata.Node][]*metadata.Partition -} - -// GetUnderGroupCtrl returns the underGroupCtrl. -func (r *RmtDataCache) GetUnderGroupCtrl() bool { - return r.underGroupCtrl -} - -// GetDefFlowCtrlID returns the defFlowCtrlID. -func (r *RmtDataCache) GetDefFlowCtrlID() int64 { - return r.defFlowCtrlID -} - -// GetGroupFlowCtrlID returns the groupFlowCtrlID. -func (r *RmtDataCache) GetGroupFlowCtrlID() int64 { - return r.groupFlowCtrlID -} - -// GetSubscribeInfo returns the subscribeInfo. -func (r *RmtDataCache) GetSubscribeInfo() []*metadata.SubscribeInfo { - return r.subscribeInfo -} - -// PollEventResult polls the first event result from the rebalanceResults. -func (r *RmtDataCache) PollEventResult() *metadata.ConsumerEvent { - r.mu.Lock() - defer r.mu.Unlock() - if len(r.rebalanceResults) > 0 { - event := r.rebalanceResults[0] - r.rebalanceResults = r.rebalanceResults[1:] - return event - } - return nil -} - -// GetPartitionByBroker returns the -func (r *RmtDataCache) GetPartitionByBroker(node *metadata.Node) []*metadata.Partition { - if partitions, ok := r.brokerToPartitions[node]; ok { - return partitions - } - return nil -} diff --git a/tubemq-client-twins/tubemq-client-go/config/config.go b/tubemq-client-twins/tubemq-client-go/config/config.go index 47a360a..7a93831 100644 --- a/tubemq-client-twins/tubemq-client-go/config/config.go +++ b/tubemq-client-twins/tubemq-client-go/config/config.go @@ -29,7 +29,7 @@ import ( // Config defines multiple configuration options. // Refer to: https://github.com/apache/incubator-inlong/blob/3249de37acf054a9c43677131cfbb09fc6d366d1/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java type Config struct { - // Net is the namespace for network-level properties used by Broker and Master. + // Net is the namespace for network-level properties used by broker and Master. Net struct { // ReadTimeout represents how long to wait for a response. ReadTimeout time.Duration @@ -61,9 +61,13 @@ type Config struct { // used by the consumer Consumer struct { // Masters is the addresses of master. - Masters []string - // Topic of the consumption. - Topic string + Masters string + // Topics of the consumption. + Topics []string + // TopicFilters is the map of topic to filters. + TopicFilters map[string][]string + // PartitionOffset is the map of partition to its corresponding offset. + PartitionOffset map[string]int64 // ConsumerPosition is the initial offset to use if no offset was previously committed. ConsumePosition int // Group is the consumer group name. @@ -152,7 +156,7 @@ func ParseAddress(address string) (config *Config, err error) { return nil, fmt.Errorf("address format invalid: address: %v, token: %v", address, tokens) } - c.Consumer.Masters = strings.Split(tokens[0], ",") + c.Consumer.Masters = tokens[0] tokens = strings.Split(tokens[1], "&") if len(tokens) == 0 { @@ -190,8 +194,8 @@ func getConfigFromToken(config *Config, values []string) error { config.Net.TLS.TLSServerName = values[1] case "group": config.Consumer.Group = values[1] - case "topic": - config.Consumer.Topic = values[1] + case "topics": + config.Consumer.Topics = strings.Split(values[1], ",") case "consumePosition": config.Consumer.ConsumePosition, err = strconv.Atoi(values[1]) case "boundConsume": diff --git a/tubemq-client-twins/tubemq-client-go/config/config_test.go b/tubemq-client-twins/tubemq-client-go/config/config_test.go index 56bc600..a8ac703 100644 --- a/tubemq-client-twins/tubemq-client-go/config/config_test.go +++ b/tubemq-client-twins/tubemq-client-go/config/config_test.go @@ -25,11 +25,11 @@ import ( ) func TestParseAddress(t *testing.T) { - address := "127.0.0.1:9092,127.0.0.1:9093?topic=Topic&group=Group&tlsEnable=false&msgNotFoundWait=10000&heartbeatMaxRetryTimes=6" + address := "127.0.0.1:9092,127.0.0.1:9093?topics=Topic1,Topic2&group=Group&tlsEnable=false&msgNotFoundWait=10000&heartbeatMaxRetryTimes=6" c, err := ParseAddress(address) assert.Nil(t, err) - assert.Equal(t, c.Consumer.Masters, []string{"127.0.0.1:9092", "127.0.0.1:9093"}) - assert.Equal(t, c.Consumer.Topic, "Topic") + assert.Equal(t, c.Consumer.Masters, "127.0.0.1:9092,127.0.0.1:9093") + assert.Equal(t, c.Consumer.Topics, []string{"Topic1", "Topic2"}) assert.Equal(t, c.Consumer.Group, "Group") assert.Equal(t, c.Consumer.MsgNotFoundWait, 10000*time.Millisecond) @@ -41,11 +41,11 @@ func TestParseAddress(t *testing.T) { _, err = ParseAddress(address) assert.NotNil(t, err) - address = "127.0.0.1:9092,127.0.0.1:9093?Topic=Topic&ttt" + address = "127.0.0.1:9092,127.0.0.1:9093?topics=Topic&ttt" _, err = ParseAddress(address) assert.NotNil(t, err) - address = "127.0.0.1:9092,127.0.0.1:9093?Topic=Topic&ttt=ttt" + address = "127.0.0.1:9092,127.0.0.1:9093?topics=Topic&ttt=ttt" _, err = ParseAddress(address) assert.NotNil(t, err) } diff --git a/tubemq-client-twins/tubemq-client-go/errs/errs.go b/tubemq-client-twins/tubemq-client-go/errs/errs.go index 00b3c81..eb45179 100644 --- a/tubemq-client-twins/tubemq-client-go/errs/errs.go +++ b/tubemq-client-twins/tubemq-client-go/errs/errs.go @@ -33,12 +33,18 @@ const ( RetAssertionFailure = 4 // RetRequestFailure represents the error code of request error. RetRequestFailure = 5 - // RetSelectorNotExist = 6 - RetSelectorNotExist = 6 + // RetSelectorNotExist represents the selector not exists. + RetSelectorNotExist = 6 + RetErrHBNoNode = 411 + RetCertificateFailure = 415 + RetConsumeGroupForbidden = 450 + RetConsumeContentForbidden = 455 ) // ErrAssertionFailure represents RetAssertionFailure error. -var ErrAssertionFailure = New(RetAssertionFailure, "AssertionFailure") +var ( + ErrAssertionFailure = New(RetAssertionFailure, "AssertionFailure") +) // Error provides a TubeMQ-specific error container type Error struct { diff --git a/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go b/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go index 2b4ce49..6ce2915 100644 --- a/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go +++ b/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go @@ -25,6 +25,14 @@ type ConsumerEvent struct { subscribeInfo []*SubscribeInfo } +func NewEvent(rebalanceID int64, eventType int32, subscribeInfo []*SubscribeInfo) *ConsumerEvent { + return &ConsumerEvent{ + rebalanceID: rebalanceID, + eventType: eventType, + subscribeInfo: subscribeInfo, + } +} + // GetRebalanceID returns the rebalanceID of a consumer event. func (c *ConsumerEvent) GetRebalanceID() int64 { return c.rebalanceID diff --git a/tubemq-client-twins/tubemq-client-go/metadata/metadata.go b/tubemq-client-twins/tubemq-client-go/metadata/metadata.go index 861a06e..f64674a 100644 --- a/tubemq-client-twins/tubemq-client-go/metadata/metadata.go +++ b/tubemq-client-twins/tubemq-client-go/metadata/metadata.go @@ -45,3 +45,23 @@ func (m *Metadata) GetReadStatus() int32 { func (m *Metadata) GetReportTimes() bool { return m.reportTimes } + +// SetNode sets the node. +func (m *Metadata) SetNode(node *Node) { + m.node = node +} + +// SetSubscribeInfo sets the subscribeInfo. +func (m *Metadata) SetSubscribeInfo(sub *SubscribeInfo) { + m.subscribeInfo = sub +} + +// ReadStatus sets the status. +func (m *Metadata) SetReadStatus(status int32) { + m.readStatus = status +} + +// SetReportTimes sets the reportTimes. +func (m *Metadata) SetReportTimes(reportTimes bool) { + m.reportTimes = reportTimes +} diff --git a/tubemq-client-twins/tubemq-client-go/metadata/node.go b/tubemq-client-twins/tubemq-client-go/metadata/node.go index 625f278..bcfda82 100644 --- a/tubemq-client-twins/tubemq-client-go/metadata/node.go +++ b/tubemq-client-twins/tubemq-client-go/metadata/node.go @@ -19,6 +19,7 @@ package metadata import ( "strconv" + "strings" ) // Node represents the metadata of a node. @@ -29,6 +30,43 @@ type Node struct { address string } +// NewNode constructs a node from a given string. +// If the given string is invalid, it will return error. +func NewNode(isBroker bool, node string) (*Node, error) { + res := strings.Split(node, ":") + nodeID := 0 + host := "" + port := 8123 + var err error + if isBroker { + nodeID, err = strconv.Atoi(res[0]) + if err != nil { + return nil, err + } + host = res[1] + if len(res) >= 3 { + port, err = strconv.Atoi(res[2]) + if err != nil { + return nil, err + } + } + } else { + host = res[0] + if len(res) >= 2 { + port, err = strconv.Atoi(res[1]) + if err != nil { + return nil, err + } + } + } + return &Node{ + id: uint32(nodeID), + host: host, + port: uint32(port), + address: host + ":" + strconv.Itoa(port), + }, nil +} + // GetID returns the id of a node. func (n *Node) GetID() uint32 { return n.id @@ -53,3 +91,19 @@ func (n *Node) GetAddress() string { func (n *Node) String() string { return strconv.Itoa(int(n.id)) + ":" + n.host + ":" + strconv.Itoa(int(n.port)) } + +// SetHost sets the host. +func (n *Node) SetHost(host string) { + n.host = host +} + +// SetAddress sets the address. +func (n *Node) SetAddress(address string) error { + port, err := strconv.Atoi(strings.Split(address, ":")[1]) + if err != nil { + return err + } + n.address = address + n.port = uint32(port) + return nil +} diff --git a/tubemq-client-twins/tubemq-client-go/metadata/partition.go b/tubemq-client-twins/tubemq-client-go/metadata/partition.go index a180214..fbe1086 100644 --- a/tubemq-client-twins/tubemq-client-go/metadata/partition.go +++ b/tubemq-client-twins/tubemq-client-go/metadata/partition.go @@ -19,18 +19,48 @@ package metadata import ( "strconv" + "strings" ) // Partition represents the metadata of a partition. type Partition struct { topic string - Broker *Node + broker *Node partitionID int32 partitionKey string offset int64 lastConsumed bool } +func NewPartition(partition string) (*Partition, error) { + var b *Node + var topic string + var partitionID int + var err error + pos := strings.Index(partition, "#") + if pos != -1 { + broker := partition[:pos] + b, err = NewNode(true, broker) + if err != nil { + return nil, err + } + p := partition[pos+1:] + pos = strings.Index(p, ":") + if pos != -1 { + topic = p[0:pos] + partitionID, err = strconv.Atoi(p[pos+1:]) + if err != nil { + return nil, err + } + } + } + return &Partition{ + topic: topic, + broker: b, + partitionID: int32(partitionID), + }, nil +} + // GetLastConsumed returns lastConsumed of a partition. func (p *Partition) GetLastConsumed() bool { return p.lastConsumed @@ -51,7 +81,15 @@ func (p *Partition) GetTopic() string { return p.topic } +func (p *Partition) GetBroker() *Node { + return p.broker +} + // String returns the metadata of a Partition as a string. func (p *Partition) String() string { - return p.Broker.String() + "#" + p.topic + "@" + strconv.Itoa(int(p.partitionID)) + return p.broker.String() + "#" + p.topic + "@" + strconv.Itoa(int(p.partitionID)) +} + +func (p *Partition) SetLastConsumed(lastConsumed bool) { + p.lastConsumed = lastConsumed } diff --git a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go b/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go index d76437a..90ae3ef 100644 --- a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go +++ b/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go @@ -19,14 +19,14 @@ package metadata import ( "fmt" + "strings" ) // SubscribeInfo represents the metadata of the subscribe info. type SubscribeInfo struct { - group string - consumerID string - partition *Partition - qryPriorityID int32 + group string + consumerID string + partition *Partition } // GetGroup returns the group name. @@ -44,12 +44,48 @@ func (s *SubscribeInfo) GetPartition() *Partition { return s.partition } -// GetQryPriorityID returns the QryPriorityID. -func (s *SubscribeInfo) GetQryPriorityID() int32 { - return s.qryPriorityID -} - // String returns the contents of SubscribeInfo as a string. func (s *SubscribeInfo) String() string { return fmt.Sprintf("%s@%s-%s", s.consumerID, s.group, s.partition.String()) } + +// NewSubscribeInfo constructs a SubscribeInfo from a given string. +// If the given is invalid, it will return error. +func NewSubscribeInfo(subscribeInfo string) (*SubscribeInfo, error) { + consumerID := "" + group := "" + var partition *Partition + var err error + pos := strings.Index(subscribeInfo, "#") + if pos != -1 { + consumerInfo := subscribeInfo[:pos] + partitionInfo := subscribeInfo[pos+1:] + partition, err = NewPartition(partitionInfo) + if err != nil { + return nil, err + } + pos = strings.Index(consumerInfo, "@") + consumerID = consumerInfo[:pos] + group = consumerInfo[pos+1:] + } + return &SubscribeInfo{ + group: group, + consumerID: consumerID, + partition: partition, + }, nil +} + +// SetPartition sets the partition. +func (s *SubscribeInfo) SetPartition(partition *Partition) { + s.partition = partition +} + +// SetGroup sets the group. +func (s *SubscribeInfo) SetGroup(group string) { + s.group = group +} + +// SetConsumerID sets the consumerID. +func (s *SubscribeInfo) SetConsumerID(id string) { + s.consumerID = id +} diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go new file mode 100644 index 0000000..aa9a2d1 --- /dev/null +++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// package remote defines the remote data which is returned from TubeMQ. +package remote + +import ( + "sync" + "time" + + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util" +) + +// RmtDataCache represents the data returned from TubeMQ. +type RmtDataCache struct { + consumerID string + groupName string + underGroupCtrl bool + defFlowCtrlID int64 + groupFlowCtrlID int64 + partitionSubInfo map[string]*metadata.SubscribeInfo + rebalanceResults []*metadata.ConsumerEvent + eventMu sync.Mutex + metaMu sync.Mutex + dataBookMu sync.Mutex + brokerPartitions map[*metadata.Node]map[string]bool + qryPriorityID int32 + partitions map[string]*metadata.Partition + usedPartitions map[string]int64 + indexPartitions map[string]bool + partitionTimeouts map[string]*time.Timer + topicPartitions map[string]map[string]bool + partitionRegBooked map[string]bool +} + +// NewRmtDataCache returns a default rmtDataCache. +func NewRmtDataCache() *RmtDataCache { + return &RmtDataCache{ + defFlowCtrlID: util.InvalidValue, + groupFlowCtrlID: util.InvalidValue, + qryPriorityID: int32(util.InvalidValue), + partitionSubInfo: make(map[string]*metadata.SubscribeInfo), + rebalanceResults: make([]*metadata.ConsumerEvent, 0, 0), + brokerPartitions: make(map[*metadata.Node]map[string]bool), + partitions: make(map[string]*metadata.Partition), + usedPartitions: make(map[string]int64), + indexPartitions: make(map[string]bool), + partitionTimeouts: make(map[string]*time.Timer), + topicPartitions: make(map[string]map[string]bool), + partitionRegBooked: make(map[string]bool), + } +} + +// GetUnderGroupCtrl returns the underGroupCtrl. +func (r *RmtDataCache) GetUnderGroupCtrl() bool { + return r.underGroupCtrl +} + +// GetDefFlowCtrlID returns the defFlowCtrlID. +func (r *RmtDataCache) GetDefFlowCtrlID() int64 { + return r.defFlowCtrlID +} + +// GetGroupFlowCtrlID returns the groupFlowCtrlID. +func (r *RmtDataCache) GetGroupFlowCtrlID() int64 { + return r.groupFlowCtrlID +} + +// GetGroupName returns the group name. +func (r *RmtDataCache) GetGroupName() string { + return r.groupName +} + +// GetSubscribeInfo returns the partitionSubInfo. +func (r *RmtDataCache) GetSubscribeInfo() []*metadata.SubscribeInfo { + r.metaMu.Lock() + defer r.metaMu.Unlock() + subInfos := make([]*metadata.SubscribeInfo, 0, len(r.partitionSubInfo)) + for _, sub := range r.partitionSubInfo { + subInfos = append(subInfos, sub) + } + return subInfos +} + +// GetQryPriorityID returns the QryPriorityID. +func (r *RmtDataCache) GetQryPriorityID() int32 { + return r.qryPriorityID +} + +// PollEventResult polls the first event result from the rebalanceResults. +func (r *RmtDataCache) PollEventResult() *metadata.ConsumerEvent { + r.eventMu.Lock() + defer r.eventMu.Unlock() + if len(r.rebalanceResults) > 0 { + event := r.rebalanceResults[0] + r.rebalanceResults = r.rebalanceResults[1:] + return event + } + return nil +} + +// GetPartitionByBroker returns the subscribed partitions of the given broker. +func (r *RmtDataCache) GetPartitionByBroker(broker *metadata.Node) []*metadata.Partition { + r.metaMu.Lock() + defer r.metaMu.Unlock() + + if partitionMap, ok := r.brokerPartitions[broker]; ok { + partitions := make([]*metadata.Partition, 0, len(partitionMap)) + for partition := range partitionMap { + partitions = append(partitions, r.partitions[partition]) + } + return partitions + } + return nil +} + +// SetConsumerInfo sets the consumer information including consumerID and groupName. +func (r *RmtDataCache) SetConsumerInfo(consumerID string, group string) { + r.consumerID = consumerID + r.groupName = group +} + +// UpdateDefFlowCtrlInfo updates the defFlowCtrlInfo. +func (r *RmtDataCache) UpdateDefFlowCtrlInfo(flowCtrlID int64, flowCtrlInfo string) { + +} + +// UpdateGroupFlowCtrlInfo updates the groupFlowCtrlInfo. +func (r *RmtDataCache) UpdateGroupFlowCtrlInfo(qryPriorityID int32, flowCtrlID int64, flowCtrlInfo string) { + +} + +// OfferEvent offers an consumer event. +func (r *RmtDataCache) OfferEvent(event *metadata.ConsumerEvent) { + r.eventMu.Lock() + defer r.eventMu.Unlock() + r.rebalanceResults = append(r.rebalanceResults, event) +} + +// TakeEvent takes an event from the rebalanceResults. +func (r *RmtDataCache) TakeEvent() *metadata.ConsumerEvent { + r.eventMu.Lock() + defer r.eventMu.Unlock() + if len(r.rebalanceResults) == 0 { + return nil + } + event := r.rebalanceResults[0] + r.rebalanceResults = r.rebalanceResults[1:] + return event +} + +// ClearEvent clears all the events. +func (r *RmtDataCache) ClearEvent() { + r.eventMu.Lock() + defer r.eventMu.Unlock() + r.rebalanceResults = r.rebalanceResults[:0] +} + +// RemoveAndGetPartition removes the given partitions. +func (r *RmtDataCache) RemoveAndGetPartition(subscribeInfos []*metadata.SubscribeInfo, processingRollback bool, partitions map[*metadata.Node][]*metadata.Partition) { + if len(subscribeInfos) == 0 { + return + } + r.metaMu.Lock() + defer r.metaMu.Unlock() + for _, sub := range subscribeInfos { + partitionKey := sub.GetPartition().GetPartitionKey() + if partition, ok := r.partitions[partitionKey]; ok { + if _, ok := r.usedPartitions[partitionKey]; ok { + if processingRollback { + partition.SetLastConsumed(false) + } else { + partition.SetLastConsumed(true) + } + } + if _, ok := partitions[partition.GetBroker()]; !ok { + partitions[partition.GetBroker()] = []*metadata.Partition{partition} + } else { + partitions[partition.GetBroker()] = append(partitions[partition.GetBroker()], partition) + } + r.removeMetaInfo(partitionKey) + } + r.resetIdlePartition(partitionKey, false) + } +} + +func (r *RmtDataCache) removeMetaInfo(partitionKey string) { + if partition, ok := r.partitions[partitionKey]; ok { + if partitions, ok := r.topicPartitions[partition.GetTopic()]; ok { + delete(partitions, partitionKey) + if len(partitions) == 0 { + delete(r.topicPartitions, partition.GetTopic()) + } + } + if partitions, ok := r.brokerPartitions[partition.GetBroker()]; ok { + delete(partitions, partition.GetPartitionKey()) + if len(partitions) == 0 { + delete(r.brokerPartitions, partition.GetBroker()) + } + } + delete(r.partitions, partitionKey) + delete(r.partitionSubInfo, partitionKey) + } +} + +func (r *RmtDataCache) resetIdlePartition(partitionKey string, reuse bool) { + delete(r.usedPartitions, partitionKey) + if timer, ok := r.partitionTimeouts[partitionKey]; ok { + if !timer.Stop() { + <-timer.C + } + timer.Stop() + delete(r.partitionTimeouts, partitionKey) + } + delete(r.indexPartitions, partitionKey) + if reuse { + if _, ok := r.partitions[partitionKey]; ok { + r.indexPartitions[partitionKey] = true + } + } +} + +// FilterPartitions returns the unsubscribed partitions. +func (r *RmtDataCache) FilterPartitions(subInfos []*metadata.SubscribeInfo) []*metadata.Partition { + r.metaMu.Lock() + defer r.metaMu.Unlock() + unsubPartitions := make([]*metadata.Partition, 0, len(subInfos)) + if len(r.partitions) == 0 { + for _, sub := range subInfos { + unsubPartitions = append(unsubPartitions, sub.GetPartition()) + } + } else { + for _, sub := range subInfos { + if _, ok := r.partitions[sub.GetPartition().GetPartitionKey()]; !ok { + unsubPartitions = append(unsubPartitions, sub.GetPartition()) + } + } + } + return unsubPartitions +} + +// AddNewPartition append a new partition. +func (r *RmtDataCache) AddNewPartition(newPartition *metadata.Partition) { + sub := &metadata.SubscribeInfo{} + sub.SetPartition(newPartition) + sub.SetConsumerID(r.consumerID) + sub.SetGroup(r.groupName) + + r.metaMu.Lock() + defer r.metaMu.Unlock() + partitionKey := newPartition.GetPartitionKey() + if partition, ok := r.partitions[partitionKey]; !ok { + r.partitions[partitionKey] = partition + if partitions, ok := r.topicPartitions[partition.GetPartitionKey()]; !ok { + newPartitions := make(map[string]bool) + newPartitions[partitionKey] = true + r.topicPartitions[partition.GetTopic()] = newPartitions + } else if _, ok := partitions[partitionKey]; !ok { + partitions[partitionKey] = true + } + if partitions, ok := r.brokerPartitions[partition.GetBroker()]; !ok { + newPartitions := make(map[string]bool) + newPartitions[partitionKey] = true + r.brokerPartitions[partition.GetBroker()] = newPartitions + } else if _, ok := partitions[partitionKey]; !ok { + partitions[partitionKey] = true + } + r.partitionSubInfo[partitionKey] = sub + } + r.resetIdlePartition(partitionKey, true) +} + +// HandleExpiredPartitions handles the expired partitions. +func (r *RmtDataCache) HandleExpiredPartitions(wait time.Duration) { + r.metaMu.Lock() + defer r.metaMu.Unlock() + expired := make(map[string]bool, len(r.usedPartitions)) + if len(r.usedPartitions) > 0 { + curr := time.Now().UnixNano() / int64(time.Millisecond) + for partition, time := range r.usedPartitions { + if curr-time > wait.Milliseconds() { + expired[partition] = true + if p, ok := r.partitions[partition]; ok { + p.SetLastConsumed(false) + } + } + } + if len(expired) > 0 { + for partition := range expired { + r.resetIdlePartition(partition, true) + } + } + } +} + +// RemovePartition removes the given partition keys. +func (r *RmtDataCache) RemovePartition(partitionKeys []string) { + r.metaMu.Lock() + defer r.metaMu.Unlock() + + for _, partitionKey := range partitionKeys { + r.resetIdlePartition(partitionKey, false) + r.removeMetaInfo(partitionKey) + } +} + +// IsFirstRegister returns whether the given partition is first registered. +func (r *RmtDataCache) IsFirstRegister(partitionKey string) bool { + r.dataBookMu.Lock() + defer r.dataBookMu.Unlock() + + if _, ok := r.partitionRegBooked[partitionKey]; !ok { + r.partitionRegBooked[partitionKey] = true + } + return r.partitionRegBooked[partitionKey] +} diff --git a/tubemq-client-twins/tubemq-client-go/rpc/broker.go b/tubemq-client-twins/tubemq-client-go/rpc/broker.go index 7f9ead2..5105fd5 100644 --- a/tubemq-client-twins/tubemq-client-go/rpc/broker.go +++ b/tubemq-client-twins/tubemq-client-go/rpc/broker.go @@ -22,11 +22,13 @@ import ( "github.com/golang/protobuf/proto" - "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client" "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec" "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs" "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata" "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util" ) const ( @@ -47,14 +49,14 @@ const ( ) // RegisterRequestC2B implements the RegisterRequestC2B interface according to TubeMQ RPC protocol. -func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) { +func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.RegisterResponseB2C, error) { reqC2B := &protocol.RegisterRequestC2B{ OpType: proto.Int32(register), ClientId: proto.String(sub.GetClientID()), GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()), TopicName: proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()), PartitionId: proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()), - QryPriorityId: proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()), + QryPriorityId: proto.Int32(r.GetQryPriorityID()), ReadStatus: proto.Int32(metadata.GetReadStatus()), AuthInfo: sub.GetAuthorizedInfo(), } @@ -66,7 +68,7 @@ func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.M } } offset := sub.GetAssignedPartOffset(metadata.GetSubscribeInfo().GetPartition().GetPartitionKey()) - if offset != client.InValidOffset { + if offset != util.InvalidValue { reqC2B.CurrOffset = proto.Int64(offset) } data, err := proto.Marshal(reqC2B) @@ -87,7 +89,7 @@ func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.M Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), } - rspBody, err := c.doRequest(ctx, req) + rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req) if err != nil { return nil, err } @@ -101,7 +103,7 @@ func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.M } // UnregisterRequestC2B implements the UnregisterRequestC2B interface according to TubeMQ RPC protocol. -func (c *rpcClient) UnregisterRequestC2B(ctx context.Context, metadata metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) { +func (c *rpcClient) UnregisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.RegisterResponseB2C, error) { reqC2B := &protocol.RegisterRequestC2B{ OpType: proto.Int32(unregister), ClientId: proto.String(sub.GetClientID()), @@ -129,7 +131,7 @@ func (c *rpcClient) UnregisterRequestC2B(ctx context.Context, metadata metadata. Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), } - rspBody, err := c.doRequest(ctx, req) + rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req) if err != nil { return nil, err } @@ -143,7 +145,7 @@ func (c *rpcClient) UnregisterRequestC2B(ctx context.Context, metadata metadata. } // GetMessageRequestC2B implements the GetMessageRequestC2B interface according to TubeMQ RPC protocol. -func (c *rpcClient) GetMessageRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error) { +func (c *rpcClient) GetMessageRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.GetMessageResponseB2C, error) { reqC2B := &protocol.GetMessageRequestC2B{ ClientId: proto.String(sub.GetClientID()), PartitionId: proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()), @@ -171,7 +173,7 @@ func (c *rpcClient) GetMessageRequestC2B(ctx context.Context, metadata *metadata Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), } - rspBody, err := c.doRequest(ctx, req) + rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req) if err != nil { return nil, err } @@ -185,7 +187,7 @@ func (c *rpcClient) GetMessageRequestC2B(ctx context.Context, metadata *metadata } // CommitOffsetRequestC2B implements the CommitOffsetRequestC2B interface according to TubeMQ RPC protocol. -func (c *rpcClient) CommitOffsetRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, error) { +func (c *rpcClient) CommitOffsetRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.CommitOffsetResponseB2C, error) { reqC2B := &protocol.CommitOffsetRequestC2B{ ClientId: proto.String(sub.GetClientID()), TopicName: proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()), @@ -206,12 +208,12 @@ func (c *rpcClient) CommitOffsetRequestC2B(ctx context.Context, metadata *metada ProtocolVer: proto.Int32(2), } req.RequestBody = &protocol.RequestBody{ - Method: proto.Int32(brokerConsumerHeartbeat), + Method: proto.Int32(brokerConsumerCommit), Request: data, Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), } - rspBody, err := c.doRequest(ctx, req) + rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req) if err != nil { return nil, err } @@ -225,12 +227,12 @@ func (c *rpcClient) CommitOffsetRequestC2B(ctx context.Context, metadata *metada } // HeartbeatRequestC2B implements the HeartbeatRequestC2B interface according to TubeMQ RPC protocol. -func (c *rpcClient) HeartbeatRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error) { +func (c *rpcClient) HeartbeatRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.HeartBeatResponseB2C, error) { reqC2B := &protocol.HeartBeatRequestC2B{ ClientId: proto.String(sub.GetClientID()), GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()), ReadStatus: proto.Int32(metadata.GetReadStatus()), - QryPriorityId: proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()), + QryPriorityId: proto.Int32(r.GetQryPriorityID()), AuthInfo: sub.GetAuthorizedInfo(), } partitions := r.GetPartitionByBroker(metadata.GetNode()) @@ -256,7 +258,7 @@ func (c *rpcClient) HeartbeatRequestC2B(ctx context.Context, metadata *metadata. Flag: proto.Int32(0), } - rspBody, err := c.doRequest(ctx, req) + rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req) if err != nil { return nil, err } diff --git a/tubemq-client-twins/tubemq-client-go/rpc/client.go b/tubemq-client-twins/tubemq-client-go/rpc/client.go index 71123c7..fab5bae 100644 --- a/tubemq-client-twins/tubemq-client-go/rpc/client.go +++ b/tubemq-client-twins/tubemq-client-go/rpc/client.go @@ -21,13 +21,14 @@ package rpc import ( "context" - "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client" "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec" "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config" "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs" "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata" "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing" "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub" "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport" ) @@ -39,21 +40,21 @@ const ( // RPCClient is the rpc level client to interact with TubeMQ. type RPCClient interface { // RegisterRequestC2B is the rpc request for a consumer to register to a broker. - RegisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) + RegisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.RegisterResponseB2C, error) // UnregisterRequestC2B is the rpc request for a consumer to unregister to a broker. - UnregisterRequestC2B(ctx context.Context, metadata metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) + UnregisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.RegisterResponseB2C, error) // GetMessageRequestC2B is the rpc request for a consumer to get message from a broker. - GetMessageRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error) + GetMessageRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.GetMessageResponseB2C, error) // CommitOffsetRequestC2B is the rpc request for a consumer to commit offset to a broker. - CommitOffsetRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, error) + CommitOffsetRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.CommitOffsetResponseB2C, error) // HeartbeatRequestC2B is the rpc request for a consumer to send heartbeat to a broker. - HeartbeatRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error) + HeartbeatRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.HeartBeatResponseB2C, error) // RegisterRequestC2M is the rpc request for a consumer to register request to master. - RegisterRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, error) + RegisterRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.RegisterResponseM2C, error) // HeartRequestC2M is the rpc request for a consumer to send heartbeat to master. - HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error) + HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.HeartResponseM2C, error) // CloseRequestC2M is the rpc request for a consumer to be closed to master. - CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CloseResponseM2C, error) + CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.CloseResponseM2C, error) } // New returns a default TubeMQ rpc Client @@ -70,8 +71,8 @@ type rpcClient struct { config *config.Config } -func (c *rpcClient) doRequest(ctx context.Context, req codec.RPCRequest) (*protocol.RspResponseBody, error) { - rsp, err := c.client.DoRequest(ctx, req) +func (c *rpcClient) doRequest(ctx context.Context, address string, req codec.RPCRequest) (*protocol.RspResponseBody, error) { + rsp, err := c.client.DoRequest(ctx, address, req) if err != nil { return nil, errs.New(errs.RetRequestFailure, err.Error()) } diff --git a/tubemq-client-twins/tubemq-client-go/rpc/master.go b/tubemq-client-twins/tubemq-client-go/rpc/master.go index 9eb336a..c0d4bc3 100644 --- a/tubemq-client-twins/tubemq-client-go/rpc/master.go +++ b/tubemq-client-twins/tubemq-client-go/rpc/master.go @@ -22,11 +22,12 @@ import ( "github.com/golang/protobuf/proto" - "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client" "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec" "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs" "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata" "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub" ) const ( @@ -39,7 +40,7 @@ const ( ) // RegisterRequestRequestC2M implements the RegisterRequestRequestC2M interface according to TubeMQ RPC protocol. -func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, error) { +func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.RegisterResponseM2C, error) { reqC2M := &protocol.RegisterRequestC2M{ ClientId: proto.String(sub.GetClientID()), HostName: proto.String(metadata.GetNode().GetHost()), @@ -48,7 +49,7 @@ func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.M SessionTime: proto.Int64(sub.GetSubscribedTime()), DefFlowCheckId: proto.Int64(r.GetDefFlowCtrlID()), GroupFlowCheckId: proto.Int64(r.GetGroupFlowCtrlID()), - QryPriorityId: proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()), + QryPriorityId: proto.Int32(r.GetQryPriorityID()), AuthInfo: sub.GetMasterCertificateIInfo(), } reqC2M.TopicList = make([]string, 0, len(sub.GetTopics())) @@ -92,7 +93,7 @@ func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.M Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), } - rspBody, err := c.doRequest(ctx, req) + rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req) if err != nil { return nil, err } @@ -106,14 +107,14 @@ func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.M } // HeartRequestC2M implements the HeartRequestC2M interface according to TubeMQ RPC protocol. -func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error) { +func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.HeartResponseM2C, error) { reqC2M := &protocol.HeartRequestC2M{ ClientId: proto.String(sub.GetClientID()), GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()), DefFlowCheckId: proto.Int64(r.GetDefFlowCtrlID()), GroupFlowCheckId: proto.Int64(r.GetGroupFlowCtrlID()), ReportSubscribeInfo: proto.Bool(false), - QryPriorityId: proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()), + QryPriorityId: proto.Int32(r.GetQryPriorityID()), } event := r.PollEventResult() if event != nil || metadata.GetReportTimes() { @@ -159,7 +160,7 @@ func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata *metadata.Meta Flag: proto.Int32(0), } - rspBody, err := c.doRequest(ctx, req) + rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req) if err != nil { return nil, err } @@ -173,7 +174,7 @@ func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata *metadata.Meta } // CloseRequestC2M implements the CloseRequestC2M interface according to TubeMQ RPC protocol. -func (c *rpcClient) CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CloseResponseM2C, error) { +func (c *rpcClient) CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.CloseResponseM2C, error) { reqC2M := &protocol.CloseRequestC2M{ ClientId: proto.String(sub.GetClientID()), GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()), @@ -197,7 +198,7 @@ func (c *rpcClient) CloseRequestC2M(ctx context.Context, metadata *metadata.Meta Flag: proto.Int32(0), } - rspBody, err := c.doRequest(ctx, req) + rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req) if err != nil { return nil, err } diff --git a/tubemq-client-twins/tubemq-client-go/client/info.go b/tubemq-client-twins/tubemq-client-go/sub/info.go similarity index 62% rename from tubemq-client-twins/tubemq-client-go/client/info.go rename to tubemq-client-twins/tubemq-client-go/sub/info.go index 25a2686..737b08a 100644 --- a/tubemq-client-twins/tubemq-client-go/client/info.go +++ b/tubemq-client-twins/tubemq-client-go/sub/info.go @@ -15,16 +15,19 @@ * limitations under the License. */ -// Package client defines the api and information -// which can be exposed to user. -package client +// package sub defines the subscription information of a client. +package sub import ( + "strconv" + "time" + + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config" "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol" ) // InvalidOffset represents the offset which is invalid. -const InValidOffset = -1 +const InValidOffset = -2 // SubInfo represents the sub information of the client. type SubInfo struct { @@ -46,6 +49,46 @@ type SubInfo struct { masterCertificateInfo *protocol.MasterCertificateInfo } +// NewSubInfo parses the subscription from the config to SubInfo. +func NewSubInfo(config *config.Config) *SubInfo { + s := &SubInfo{ + boundConsume: config.Consumer.BoundConsume, + subscribedTime: time.Now().UnixNano() / int64(time.Millisecond), + firstRegistered: false, + topics: config.Consumer.Topics, + topicFilters: config.Consumer.TopicFilters, + } + s.topicConds = make([]string, 0, len(config.Consumer.TopicFilters)) + for topic, filters := range config.Consumer.TopicFilters { + cond := topic + "#" + count := 0 + for _, filter := range filters { + if count > 0 { + cond += "," + } + cond += filter + } + s.topicConds = append(s.topicConds, cond) + } + if config.Consumer.BoundConsume { + s.sessionKey = config.Consumer.SessionKey + s.sourceCount = int32(config.Consumer.SourceCount) + s.selectBig = config.Consumer.SelectBig + assignedPartitions := config.Consumer.PartitionOffset + count := 0 + for partition, offset := range assignedPartitions { + if count > 0 { + s.boundPartitions += "," + } + s.boundPartitions += partition + s.boundPartitions += "=" + s.boundPartitions += strconv.Itoa(int(offset)) + count++ + } + } + return s +} + // GetClientID returns the client ID. func (s *SubInfo) GetClientID() string { return s.clientID @@ -124,6 +167,32 @@ func (s *SubInfo) GetAuthorizedInfo() *protocol.AuthorizedInfo { return s.authInfo } +// GetMasterCertifateInfo returns the masterCertificateInfo. func (s *SubInfo) GetMasterCertificateIInfo() *protocol.MasterCertificateInfo { return s.masterCertificateInfo } + +// FirstRegistered sets the firstRegistered to true. +func (s *SubInfo) FirstRegistered() { + s.firstRegistered = true +} + +// SetAuthorizedInfo sets the authorizedInfo. +func (s *SubInfo) SetAuthorizedInfo(auth *protocol.AuthorizedInfo) { + s.authInfo = auth +} + +// SetMasterCertificateInfo sets the masterCertificateInfo. +func (s *SubInfo) SetMasterCertificateInfo(info *protocol.MasterCertificateInfo) { + s.masterCertificateInfo = info +} + +// SetIsNotAllocated sets the notAllocated. +func (s *SubInfo) SetIsNotAllocated(isNotAllocated bool) { + s.notAllocated = isNotAllocated +} + +// SetClientID sets the clientID. +func (s *SubInfo) SetClientID(clientID string) { + s.clientID = clientID +} diff --git a/tubemq-client-twins/tubemq-client-go/transport/client.go b/tubemq-client-twins/tubemq-client-go/transport/client.go index d816a7f..91c9692 100644 --- a/tubemq-client-twins/tubemq-client-go/transport/client.go +++ b/tubemq-client-twins/tubemq-client-go/transport/client.go @@ -28,8 +28,6 @@ import ( // Options represents the transport options type Options struct { - Address string - CACertFile string TLSCertFile string TLSKeyFile string @@ -51,9 +49,9 @@ func New(opts *Options, pool *multiplexing.Pool) *Client { } // DoRequest sends the request and return the decoded response -func (c *Client) DoRequest(ctx context.Context, req codec.RPCRequest) (codec.RPCResponse, error) { +func (c *Client) DoRequest(ctx context.Context, address string, req codec.RPCRequest) (codec.RPCResponse, error) { opts := &multiplexing.DialOptions{ - Address: c.opts.Address, + Address: address, Network: "tcp", } if c.opts.CACertFile != "" { @@ -63,7 +61,7 @@ func (c *Client) DoRequest(ctx context.Context, req codec.RPCRequest) (codec.RPC opts.TLSServerName = c.opts.TLSServerName } - conn, err := c.pool.Get(ctx, c.opts.Address, req.GetSerialNo(), opts) + conn, err := c.pool.Get(ctx, address, req.GetSerialNo(), opts) if err != nil { return nil, err } diff --git a/tubemq-client-twins/tubemq-client-go/util/util.go b/tubemq-client-twins/tubemq-client-go/util/util.go new file mode 100644 index 0000000..77422ac --- /dev/null +++ b/tubemq-client-twins/tubemq-client-go/util/util.go @@ -0,0 +1,52 @@ +package util + +import ( + "net" +) + +var InvalidValue = int64(-2) + +func GetLocalHost() string { + ifaces, err := net.Interfaces() + if err != nil { + return "" + } + for _, iface := range ifaces { + if iface.Flags&net.FlagUp == 0 { + continue // interface down + } + if iface.Flags&net.FlagLoopback != 0 { + continue // loopback interface + } + addrs, err := iface.Addrs() + if err != nil { + return "" + } + for _, addr := range addrs { + var ip net.IP + switch v := addr.(type) { + case *net.IPNet: + ip = v.IP + case *net.IPAddr: + ip = v.IP + } + if ip == nil || ip.IsLoopback() { + continue + } + ip = ip.To4() + if ip == nil { + continue // not an ipv4 address + } + return ip.String() + } + } + return "" +} + +func GenBrokerAuthenticateToken(username string, password string) string { + return "" +} + +func GenMasterAuthenticateToken(username string, password string) string { + return "" +}
