This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new 1ac164c  [INLONG-629]Go SDK Close API (#489)
1ac164c is described below

commit 1ac164c0bb9d8fd0f93d78835934ee0b0077ed58
Author: Zijie Lu <[email protected]>
AuthorDate: Fri Jul 2 17:05:16 2021 +0800

    [INLONG-629]Go SDK Close API (#489)
    
    Signed-off-by: Zijie Lu <[email protected]>
---
 .../tubemq-client-go/client/consumer.go            |  2 +
 .../tubemq-client-go/client/consumer_impl.go       | 51 ++++++++++++++++++++--
 .../tubemq-client-go/client/heartbeat.go           | 13 ++++++
 .../tubemq-client-go/multiplexing/multiplexing.go  | 14 ++++++
 .../tubemq-client-go/remote/remote.go              | 17 ++++++++
 tubemq-client-twins/tubemq-client-go/rpc/client.go |  7 +++
 6 files changed, 101 insertions(+), 3 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go 
b/tubemq-client-twins/tubemq-client-go/client/consumer.go
index a4eec0b..bb17f03 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go
@@ -41,4 +41,6 @@ type Consumer interface {
        Confirm(confirmContext string, consumed bool) (*ConsumerResult, error)
        // GetCurrConsumedInfo returns the consumptions of the consumer.
        GetCurrConsumedInfo() (map[string]*ConsumerOffset, error)
+       // Close closes the consumer client and release the resources.
+       Close() error
 }
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go 
b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 031d761..616be17 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -320,14 +320,24 @@ func (c *consumer) GetCurrConsumedInfo() 
(map[string]*ConsumerOffset, error) {
        panic("implement me")
 }
 
+// Close implementation of TubeMQ consumer.
+func (c *consumer) Close() error {
+       close(c.done)
+       err := c.close2Master()
+       if err != nil {
+               return err
+       }
+       c.closeAllBrokers()
+       c.heartbeatManager.close()
+       c.client.Close()
+       return nil
+}
+
 func (c *consumer) processRebalanceEvent() {
        for {
                select {
                case event, ok := <-c.rmtDataCache.EventCh:
                        if ok {
-                               if event.GetEventStatus() == 
int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue {
-                                       break
-                               }
                                c.rmtDataCache.ClearEvent()
                                switch event.GetEventType() {
                                case metadata.Disconnect, 
metadata.OnlyDisconnect:
@@ -620,3 +630,38 @@ func (c *consumer) convertMessages(filtered bool, topic 
string, rsp *protocol.Ge
        }
        return msgSize, msgs
 }
+
+func (c *consumer) close2Master() error {
+       ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout)
+       defer cancel()
+
+       m := &metadata.Metadata{}
+       node := &metadata.Node{}
+       node.SetHost(util.GetLocalHost())
+       node.SetAddress(c.master.Address)
+       m.SetNode(node)
+       sub := &metadata.SubscribeInfo{}
+       sub.SetGroup(c.config.Consumer.Group)
+       m.SetSubscribeInfo(sub)
+       auth := &protocol.AuthenticateInfo{}
+       c.genMasterAuthenticateToken(auth, true)
+       mci := &protocol.MasterCertificateInfo{
+               AuthInfo: auth,
+       }
+       c.subInfo.SetMasterCertificateInfo(mci)
+       rsp, err := c.client.CloseRequestC2M(ctx, m, c.subInfo)
+       if err != nil {
+               return err
+       }
+       if !rsp.GetSuccess() {
+               return errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
+       }
+       return nil
+}
+
+func (c *consumer) closeAllBrokers() {
+       partitions := c.rmtDataCache.GetAllClosedBrokerParts()
+       if len(partitions) > 0 {
+               c.unregister2Broker(partitions)
+       }
+}
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go 
b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index fdd0d20..88ac12c 100644
--- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -234,3 +234,16 @@ func (h *heartbeatManager) resetBrokerTimer(broker 
*metadata.Node) {
                hm.timer.Reset(interval)
        }
 }
+
+func (h *heartbeatManager) close() {
+       h.mu.Lock()
+       defer h.mu.Unlock()
+
+       for _, heartbeat := range h.heartbeats {
+               if !heartbeat.timer.Stop() {
+                       <-heartbeat.timer.C
+               }
+               heartbeat.timer.Stop()
+       }
+       h.heartbeats = nil
+}
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go 
b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
index 0c4239c..82c767a 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
@@ -175,6 +175,20 @@ func getCertPool(caCertFile string) (*x509.CertPool, 
error) {
        return nil, nil
 }
 
+// Close will release all the connections.
+func (p *Pool) Close() {
+       p.connections.Range(func(key, value interface{}) bool {
+               connection, ok := value.(*Connection)
+               if !ok {
+                       return false
+               }
+               close(connection.done)
+               close(connection.mDone)
+               connection.conn.Close()
+               return true
+       })
+}
+
 type recvReader struct {
        ctx  context.Context
        recv chan codec.Response
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go 
b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index a5310e7..c645d3e 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -497,3 +497,20 @@ func (r *RmtDataCache) GetPartition(key string) 
*metadata.Partition {
        }
        return nil
 }
+
+// GetAllClosedBrokerParts will return the partitions which should be closed.
+func (r *RmtDataCache) GetAllClosedBrokerParts() 
map[*metadata.Node][]*metadata.Partition {
+       r.metaMu.Lock()
+       defer r.metaMu.Unlock()
+
+       brokerPartitions := make(map[*metadata.Node][]*metadata.Partition)
+       for _, partition := range r.partitions {
+               partitions, ok := brokerPartitions[partition.GetBroker()]
+               if !ok {
+                       brokerPartitions[partition.GetBroker()] = 
[]*metadata.Partition{partition}
+               } else {
+                       partitions = append(partitions, partition)
+               }
+       }
+       return brokerPartitions
+}
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/client.go 
b/tubemq-client-twins/tubemq-client-go/rpc/client.go
index fab5bae..64bba71 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/client.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/client.go
@@ -55,6 +55,8 @@ type RPCClient interface {
        HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub 
*sub.SubInfo, r *remote.RmtDataCache) (*protocol.HeartResponseM2C, error)
        // CloseRequestC2M is the rpc request for a consumer to be closed to 
master.
        CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub 
*sub.SubInfo) (*protocol.CloseResponseM2C, error)
+       // Close will close the rpc client and release the resource.
+       Close()
 }
 
 // New returns a default TubeMQ rpc Client
@@ -71,6 +73,11 @@ type rpcClient struct {
        config *config.Config
 }
 
+// Close will release the resource of multiplexing pool.
+func (c *rpcClient) Close() {
+       c.pool.Close()
+}
+
 func (c *rpcClient) doRequest(ctx context.Context, address string, req 
codec.RPCRequest) (*protocol.RspResponseBody, error) {
        rsp, err := c.client.DoRequest(ctx, address, req)
        if err != nil {

Reply via email to