lvanneo commented on code in PR #7933: URL: https://github.com/apache/inlong/pull/7933#discussion_r1178887907
########## inlong-tubemq/tubemq-client-twins/tubemq-client-go/example/producer.go: ########## @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package main + +import ( + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/config" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log" +) + +func main() { + // Example for parseAddress + cfg, err := config.ParseAddress("127.0.0.1:8099?topic=demo") + + if err != nil { + log.Errorf("Failed to parse address", err.Error()) Review Comment: It is recommended to provide a format placeholder. ########## inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/producer_impl.go: ########## @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package client + +import ( + "context" + "os" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/config" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/metadata" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/multiplexing" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/protocol" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/transport" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/util" +) + +type producer struct { + clientID string + config *config.Config + nextAuth2Master int32 + selector selector.Selector + master *selector.Node + client rpc.RPCClient + masterHBRetry int + unreportedTimes int + publishTopics []string + brokerCheckSum int64 + brokerMap map[string]*metadata.Node + brokerMu sync.Mutex +} + +// NewProducer returns a producer which is constructed by a given config. +func NewProducer(config *config.Config) (Producer, error) { + if err := config.ValidateProducer(); err != nil { + return nil, err + } + log.Infof("The config of the producer is %s", config) + + selector, err := selector.Get("ip") + if err != nil { + return nil, err + } + + clientID := newProducerClientID() + pool := multiplexing.NewPool() + opts := &transport.Options{} + if config.Net.TLS.Enable { + opts.TLSEnable = true + opts.CACertFile = config.Net.TLS.CACertFile + opts.TLSCertFile = config.Net.TLS.TLSCertFile + opts.TLSKeyFile = config.Net.TLS.TLSKeyFile + opts.TLSServerName = config.Net.TLS.TLSServerName + } + + client := rpc.New(pool, opts, config) + + p := &producer{ + config: config, + clientID: clientID, + selector: selector, + client: client, + unreportedTimes: 0, + brokerMap: make(map[string]*metadata.Node), + publishTopics: config.Producer.Topics, + } + + err = p.register2Master(true) + if err != nil { + return nil, err + } + + log.Infof("[PRODUCER] start producer success, client=%s", clientID) + return p, nil +} + +func (p *producer) register2Master(needChange bool) error { + if needChange { + p.selector.Refresh(p.config.Producer.Masters) + node, err := p.selector.Select(p.config.Producer.Masters) + if err != nil { + return err + } + p.master = node + } + + retryCount := 0 + for { + rsp, err := p.sendRegRequest2Master() + if err != nil || !rsp.GetSuccess() { + if err != nil { + log.Errorf("[PRODUCER]register2Master error %s", err.Error()) + } + if !p.master.HasNext { + if err != nil { + return err + } + if rsp != nil { + log.Errorf("[PRODUCER] register2Master(%s) failure exist register, client=%s, errCode: %d, errorMsg: %s", + p.master.Address, p.clientID, rsp.GetErrCode(), rsp.GetErrMsg()) + } + break + } + retryCount++ + log.Warnf("[PRODUCER] register2master(%s) failure, client=%s, retry count=%d", + p.master.Address, p.clientID, retryCount) + if p.master, err = p.selector.Select(p.config.Consumer.Masters); err != nil { + return err + } + continue + } + log.Infof("register2Master response %s", rsp.String()) + p.masterHBRetry = 0 + p.processRegisterResponseM2P(rsp) + break + } + return nil +} + +func (p *producer) sendRegRequest2Master() (*protocol.RegisterResponseM2P, error) { + ctx, cancel := context.WithTimeout(context.Background(), p.config.Net.ReadTimeout) + defer cancel() + + m := &metadata.Metadata{} + node := &metadata.Node{} + node.SetHost(util.GetLocalHost()) + node.SetAddress(p.master.Address) + + auth := &protocol.AuthenticateInfo{} + if p.needGenMasterCertificateInfo(true) { + util.GenMasterAuthenticateToken(auth, p.config.Net.Auth.UserName, p.config.Net.Auth.Password) + } + + m.SetNode(node) + + rsp, err := p.client.RegisterRequestP2M(ctx, m, p.clientID) + return rsp, err +} + +func (p *producer) processRegisterResponseM2P(rsp *protocol.RegisterResponseM2P) { + p.brokerCheckSum = rsp.GetBrokerCheckSum() + brokerInfos := rsp.GetBrokerInfos() + p.updateBrokerInfoList(brokerInfos) +} + +func newProducerClientID() string { + return util.GetLocalHost() + "-" + + strconv.Itoa(os.Getpid()) + "-" + + strconv.Itoa(int(time.Now().Unix()*1000)) + "-" + + strconv.Itoa(int(atomic.AddUint64(&clientID, 1))) + "-" + + "go-" + + tubeMQClientVersion +} + +func (p *producer) needGenMasterCertificateInfo(force bool) bool { + needAdd := false + if p.config.Net.Auth.Enable { + if force { + needAdd = true + atomic.StoreInt32(&p.nextAuth2Master, 0) + } else if atomic.LoadInt32(&p.nextAuth2Master) == 1 { + if atomic.CompareAndSwapInt32(&p.nextAuth2Master, 1, 0) { + needAdd = true + } + } + } + return needAdd +} + +func (p *producer) updateBrokerInfoList(brokerInfos []string) { + p.brokerMu.Lock() + defer p.brokerMu.Unlock() + for _, brokerInfo := range brokerInfos { + node, _ := metadata.NewNode(true, strings.Trim(brokerInfo, " :")) Review Comment: Handling the error is recommended. ########## inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/producer_impl.go: ########## @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package client + +import ( + "context" + "os" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/config" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/metadata" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/multiplexing" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/protocol" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/transport" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/util" +) + +type producer struct { + clientID string + config *config.Config + nextAuth2Master int32 + selector selector.Selector + master *selector.Node + client rpc.RPCClient + masterHBRetry int + unreportedTimes int + publishTopics []string + brokerCheckSum int64 + brokerMap map[string]*metadata.Node + brokerMu sync.Mutex +} + +// NewProducer returns a producer which is constructed by a given config. +func NewProducer(config *config.Config) (Producer, error) { + if err := config.ValidateProducer(); err != nil { + return nil, err + } + log.Infof("The config of the producer is %s", config) + + selector, err := selector.Get("ip") + if err != nil { + return nil, err + } + + clientID := newProducerClientID() + pool := multiplexing.NewPool() + opts := &transport.Options{} + if config.Net.TLS.Enable { + opts.TLSEnable = true + opts.CACertFile = config.Net.TLS.CACertFile + opts.TLSCertFile = config.Net.TLS.TLSCertFile + opts.TLSKeyFile = config.Net.TLS.TLSKeyFile + opts.TLSServerName = config.Net.TLS.TLSServerName + } + + client := rpc.New(pool, opts, config) + + p := &producer{ + config: config, + clientID: clientID, + selector: selector, + client: client, + unreportedTimes: 0, + brokerMap: make(map[string]*metadata.Node), + publishTopics: config.Producer.Topics, + } + + err = p.register2Master(true) + if err != nil { + return nil, err + } + + log.Infof("[PRODUCER] start producer success, client=%s", clientID) + return p, nil +} + +func (p *producer) register2Master(needChange bool) error { + if needChange { + p.selector.Refresh(p.config.Producer.Masters) + node, err := p.selector.Select(p.config.Producer.Masters) + if err != nil { + return err + } + p.master = node + } + + retryCount := 0 + for { + rsp, err := p.sendRegRequest2Master() + if err != nil || !rsp.GetSuccess() { + if err != nil { + log.Errorf("[PRODUCER]register2Master error %s", err.Error()) + } + if !p.master.HasNext { + if err != nil { + return err + } + if rsp != nil { + log.Errorf("[PRODUCER] register2Master(%s) failure exist register, client=%s, errCode: %d, errorMsg: %s", + p.master.Address, p.clientID, rsp.GetErrCode(), rsp.GetErrMsg()) + } + break + } + retryCount++ + log.Warnf("[PRODUCER] register2master(%s) failure, client=%s, retry count=%d", + p.master.Address, p.clientID, retryCount) + if p.master, err = p.selector.Select(p.config.Consumer.Masters); err != nil { + return err + } Review Comment: Why is this code block not controlled by the needChange condition? ########## inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/producer_impl.go: ########## @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package client + Review Comment: What does the impl in the file name producer_impl.go stand for? ########## inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/producer.go: ########## @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package client + +type Producer interface { Review Comment: It is recommended to add comments for exportable interfaces. ########## inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/producer.go: ########## @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package client + +type Producer interface { Review Comment: golang's interface is a duck type, so you don't need to do this definition, you can do it on demand. ########## inlong-tubemq/tubemq-client-twins/tubemq-client-go/example/producer.go: ########## @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package main + +import ( + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/config" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log" +) + +func main() { + // Example for parseAddress + cfg, err := config.ParseAddress("127.0.0.1:8099?topic=demo") + Review Comment: Meaningless empty line, advised to delete this line. ########## inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/producer_impl.go: ########## @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package client + +import ( + "context" + "os" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/config" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/metadata" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/multiplexing" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/protocol" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/transport" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/util" +) + +type producer struct { + clientID string + config *config.Config + nextAuth2Master int32 + selector selector.Selector + master *selector.Node + client rpc.RPCClient + masterHBRetry int + unreportedTimes int + publishTopics []string + brokerCheckSum int64 + brokerMap map[string]*metadata.Node + brokerMu sync.Mutex +} + +// NewProducer returns a producer which is constructed by a given config. +func NewProducer(config *config.Config) (Producer, error) { Review Comment: Why does the NewProducer method return an empty interface? ########## inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/producer_impl.go: ########## @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package client + +import ( + "context" + "os" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/config" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/metadata" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/multiplexing" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/protocol" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/transport" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/util" +) + +type producer struct { + clientID string + config *config.Config + nextAuth2Master int32 + selector selector.Selector + master *selector.Node + client rpc.RPCClient + masterHBRetry int + unreportedTimes int + publishTopics []string + brokerCheckSum int64 + brokerMap map[string]*metadata.Node + brokerMu sync.Mutex +} + +// NewProducer returns a producer which is constructed by a given config. Review Comment: The comment meaning does not match the code logic. ########## inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/producer_impl.go: ########## @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package client + +import ( + "context" + "os" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/config" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/metadata" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/multiplexing" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/protocol" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/transport" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/util" +) + +type producer struct { + clientID string + config *config.Config + nextAuth2Master int32 + selector selector.Selector + master *selector.Node + client rpc.RPCClient + masterHBRetry int + unreportedTimes int + publishTopics []string + brokerCheckSum int64 + brokerMap map[string]*metadata.Node + brokerMu sync.Mutex +} + +// NewProducer returns a producer which is constructed by a given config. +func NewProducer(config *config.Config) (Producer, error) { + if err := config.ValidateProducer(); err != nil { + return nil, err + } + log.Infof("The config of the producer is %s", config) + + selector, err := selector.Get("ip") + if err != nil { + return nil, err + } + + clientID := newProducerClientID() + pool := multiplexing.NewPool() + opts := &transport.Options{} + if config.Net.TLS.Enable { + opts.TLSEnable = true + opts.CACertFile = config.Net.TLS.CACertFile + opts.TLSCertFile = config.Net.TLS.TLSCertFile + opts.TLSKeyFile = config.Net.TLS.TLSKeyFile + opts.TLSServerName = config.Net.TLS.TLSServerName + } + + client := rpc.New(pool, opts, config) + + p := &producer{ + config: config, + clientID: clientID, + selector: selector, + client: client, + unreportedTimes: 0, + brokerMap: make(map[string]*metadata.Node), + publishTopics: config.Producer.Topics, + } + + err = p.register2Master(true) + if err != nil { + return nil, err + } + + log.Infof("[PRODUCER] start producer success, client=%s", clientID) + return p, nil +} + +func (p *producer) register2Master(needChange bool) error { + if needChange { + p.selector.Refresh(p.config.Producer.Masters) + node, err := p.selector.Select(p.config.Producer.Masters) + if err != nil { + return err + } + p.master = node + } + + retryCount := 0 + for { + rsp, err := p.sendRegRequest2Master() + if err != nil || !rsp.GetSuccess() { + if err != nil { + log.Errorf("[PRODUCER]register2Master error %s", err.Error()) + } + if !p.master.HasNext { + if err != nil { + return err + } + if rsp != nil { + log.Errorf("[PRODUCER] register2Master(%s) failure exist register, client=%s, errCode: %d, errorMsg: %s", + p.master.Address, p.clientID, rsp.GetErrCode(), rsp.GetErrMsg()) + } + break + } Review Comment: The level of code abstraction is inconsistent, and the level of code nesting is too deep. ########## inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/producer.go: ########## @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package client + +type Producer interface { Review Comment: Why do you need an empty interface? ########## inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/producer_impl.go: ########## @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package client + +import ( + "context" + "os" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/config" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/metadata" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/multiplexing" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/protocol" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/transport" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/util" +) + +type producer struct { + clientID string + config *config.Config + nextAuth2Master int32 + selector selector.Selector + master *selector.Node + client rpc.RPCClient + masterHBRetry int + unreportedTimes int + publishTopics []string + brokerCheckSum int64 + brokerMap map[string]*metadata.Node + brokerMu sync.Mutex +} + +// NewProducer returns a producer which is constructed by a given config. +func NewProducer(config *config.Config) (Producer, error) { Review Comment: The NewProducer method should only accomplish instance creation and should maintain a single responsibility. ########## inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/producer_impl.go: ########## @@ -0,0 +1,189 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package client + +import ( + "context" + "strconv" + "strings" + "sync" + "sync/atomic" + + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/config" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/metadata" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/multiplexing" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/protocol" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/transport" + "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/util" +) + +type producer struct { + clientID string + config *config.Config + nextAuth2Master int32 + selector selector.Selector + master *selector.Node + client rpc.RPCClient + masterHBRetry int + unreportedTimes int + publishTopics []string + brokerCheckSum int64 + brokerMap map[string]*metadata.Node + brokerMu sync.Mutex +} + +// NewProducer returns a producer which is constructed by a given config. +func NewProducer(config *config.Config) (Producer, error) { Review Comment: golang's interface is of duck type and does not need to be bound to the implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
