This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 3cf68fbad554f7a6cdaddf6fd1021b4595650dd6
Author: Zijie Lu <[email protected]>
AuthorDate: Wed May 26 10:08:47 2021 +0800

    Address review comments
    
    Signed-off-by: Zijie Lu <[email protected]>
---
 .../tubemq-client-go/config/config.go              | 161 +++++++++++++--------
 .../tubemq-client-go/config/config_test.go         |  12 +-
 2 files changed, 109 insertions(+), 64 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/config/config.go 
b/tubemq-client-twins/tubemq-client-go/config/config.go
index f4b026e..d867d0c 100644
--- a/tubemq-client-twins/tubemq-client-go/config/config.go
+++ b/tubemq-client-twins/tubemq-client-go/config/config.go
@@ -27,13 +27,14 @@ import (
 )
 
 // Config defines multiple configuration options.
+// Refer to: 
https://github.com/apache/incubator-inlong/blob/3249de37acf054a9c43677131cfbb09fc6d366d1/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
 type Config struct {
        // Net is the namespace for network-level properties used by Broker and 
Master.
        Net struct {
                // How long to wait for a response.
                ReadTimeout time.Duration
                // TLS based authentication with broker and master.
-               TLS         struct {
+               TLS struct {
                        // Whether or not to use TLS.
                        Enable bool
                        // CACertFile for TLS.
@@ -45,35 +46,68 @@ type Config struct {
                        // TTSServerName for TLS.
                        TLSServerName string
                }
+               // Account based authentication with broker and master.
+               Auth struct {
+                       // Whether or not to use authentication.
+                       Enable bool
+                       // Username of authentication.
+                       UserName string
+                       // Password of authentication.
+                       Password string
+               }
        }
 
        // Consumer is the namespace for configuration related to consume 
messages,
        // used by the consumer
        Consumer struct {
-               masters []string
-               topic string
-               offset int
-               Group                    string
-               boundConsume             bool
-               sessionKey               string
-               sourceCount              int
-               selectBig                bool
-               rollbackIfConfirmTimeout bool
-               maxSubInfoReportInterval int
-               maxPartCheckPeriod       time.Duration
-               partCheckSlice           time.Duration
-               msgNotFoundWait          time.Duration
-               rebConfirmWait           time.Duration
-               maxConfirmWait           time.Duration
-               shutdownRebWait          time.Duration
+               // The addresses of master.
+               Masters []string
+               // The consumption topic.
+               Topic string
+               // The initial offset to use if no offset was previously 
committed.
+               ConsumePosition int
+               // The consumer group name.
+               Group string
+               // Whether or not to specify the offset.
+               BoundConsume bool
+               // SessionKey is defined by the client.
+               // The session key will be the same in a batch.
+               SessionKey string
+               // The number of consumers in a batch.
+               SourceCount int
+               // If multiple consumers want to reset the offset of the same 
partition,
+               // whether the server or not to use the biggest offset
+               // the server will use the biggest offset if set.
+               // Otherwise the server will use the smallest offset.
+               SelectBig bool
+               // If the confirm request timeouts, whether this batch of data 
should be considered as successful.
+               // This batch of data will not be considered as successful if 
set.
+               RollbackIfConfirmTimeout bool
+               // The maximum interval for the client to report subscription 
information.
+               MaxSubInfoReportInterval int
+               // The maximum interval to check the partition.
+               MaxPartCheckPeriod time.Duration
+               // The interval to check the partition.
+               PartCheckSlice time.Duration
+               // The maximum wait time the offset of a partition has reached 
the maximum offset.
+               MsgNotFoundWait time.Duration
+               // How long to wait when the server is rebalancing and the 
partition is being occupied by the client.
+               RebConfirmWait time.Duration
+               // The maximum wait time a partition consumption command is 
released.
+               MaxConfirmWait time.Duration
+               // How long to wait when shutdown is called and the server is 
rebalancing.
+               ShutdownRebWait time.Duration
        }
 
        // Heartbeat is the namespace for configuration related to heartbeat 
messages,
        // used by the consumer
        Heartbeat struct {
-               interval      time.Duration
-               maxRetryTimes int
-               afterFail     time.Duration
+               // How frequently to send heartbeat.
+               Interval time.Duration
+               // The total number of times to retry sending heartbeat.
+               MaxRetryTimes int
+               // The heartbeat timeout after a heartbeat failure.
+               AfterFail time.Duration
        }
 }
 
@@ -82,23 +116,28 @@ func newDefaultConfig() *Config {
 
        c.Net.ReadTimeout = 15000 * time.Millisecond
        c.Net.TLS.Enable = false
-
-       c.Consumer.boundConsume = false
-       c.Consumer.sessionKey = ""
-       c.Consumer.sourceCount = 0
-       c.Consumer.selectBig = true
-       c.Consumer.offset = 0
-       c.Consumer.rollbackIfConfirmTimeout = true
-       c.Consumer.maxSubInfoReportInterval = 6
-       c.Consumer.maxPartCheckPeriod = 60000 * time.Millisecond
-       c.Consumer.partCheckSlice = 300 * time.Millisecond
-       c.Consumer.rebConfirmWait = 3000 * time.Millisecond
-       c.Consumer.maxConfirmWait = 60000 * time.Millisecond
-       c.Consumer.shutdownRebWait = 10000 * time.Millisecond
-
-       c.Heartbeat.interval = 10000 * time.Millisecond
-       c.Heartbeat.maxRetryTimes = 5
-       c.Heartbeat.afterFail = 60000 * time.Millisecond
+       c.Net.Auth.Enable = false
+       c.Net.Auth.UserName = ""
+       c.Net.Auth.Password = ""
+
+       c.Consumer.Group = ""
+       c.Consumer.BoundConsume = false
+       c.Consumer.SessionKey = ""
+       c.Consumer.SourceCount = 0
+       c.Consumer.SelectBig = true
+       c.Consumer.ConsumePosition = 0
+       c.Consumer.RollbackIfConfirmTimeout = true
+       c.Consumer.MaxSubInfoReportInterval = 6
+       c.Consumer.MaxPartCheckPeriod = 60000 * time.Millisecond
+       c.Consumer.PartCheckSlice = 300 * time.Millisecond
+       c.Consumer.MsgNotFoundWait = 400 * time.Millisecond
+       c.Consumer.RebConfirmWait = 3000 * time.Millisecond
+       c.Consumer.MaxConfirmWait = 60000 * time.Millisecond
+       c.Consumer.ShutdownRebWait = 10000 * time.Millisecond
+
+       c.Heartbeat.Interval = 10000 * time.Millisecond
+       c.Heartbeat.MaxRetryTimes = 5
+       c.Heartbeat.AfterFail = 60000 * time.Millisecond
 
        return c
 }
@@ -112,11 +151,11 @@ func ParseAddress(address string) (config *Config, err 
error) {
                return nil, fmt.Errorf("address format invalid: address: %v, 
token: %v", address, tokens)
        }
 
-       c.Consumer.masters = strings.Split(tokens[0], ",")
+       c.Consumer.Masters = strings.Split(tokens[0], ",")
 
        tokens = strings.Split(tokens[1], "&")
        if len(tokens) == 0 {
-               return nil, fmt.Errorf("address formata invalid: masters: %v 
with empty params", config.Consumer.masters)
+               return nil, fmt.Errorf("address formata invalid: Masters: %v 
with empty params", config.Consumer.Masters)
        }
 
        for _, token := range tokens {
@@ -140,7 +179,7 @@ func getConfigFromToken(config *Config, values []string) 
error {
                config.Net.ReadTimeout, err = parseDuration(values[1])
        case "tlsEnable":
                config.Net.TLS.Enable, err = strconv.ParseBool(values[1])
-       case "caCertFile":
+       case "CACertFile":
                config.Net.TLS.CACertFile = values[1]
        case "tlsCertFile":
                config.Net.TLS.TLSCertFile = values[1]
@@ -151,39 +190,45 @@ func getConfigFromToken(config *Config, values []string) 
error {
        case "group":
                config.Consumer.Group = values[1]
        case "topic":
-               config.Consumer.topic = values[1]
-       case "offset":
-               config.Consumer.offset, err = strconv.Atoi(values[1])
+               config.Consumer.Topic = values[1]
+       case "consumePosition":
+               config.Consumer.ConsumePosition, err = strconv.Atoi(values[1])
        case "boundConsume":
-               config.Consumer.boundConsume, err = strconv.ParseBool(values[1])
+               config.Consumer.BoundConsume, err = strconv.ParseBool(values[1])
        case "sessionKey":
-               config.Consumer.sessionKey = values[1]
+               config.Consumer.SessionKey = values[1]
        case "sourceCount":
-               config.Consumer.sourceCount, err = strconv.Atoi(values[1])
+               config.Consumer.SourceCount, err = strconv.Atoi(values[1])
        case "selectBig":
-               config.Consumer.selectBig, err = strconv.ParseBool(values[1])
+               config.Consumer.SelectBig, err = strconv.ParseBool(values[1])
        case "rollbackIfConfirmTimeout":
-               config.Consumer.rollbackIfConfirmTimeout, err = 
strconv.ParseBool(values[1])
+               config.Consumer.RollbackIfConfirmTimeout, err = 
strconv.ParseBool(values[1])
        case "maxSubInfoReportInterval":
-               config.Consumer.maxSubInfoReportInterval, err = 
strconv.Atoi(values[1])
+               config.Consumer.MaxSubInfoReportInterval, err = 
strconv.Atoi(values[1])
        case "maxPartCheckPeriod":
-               config.Consumer.maxPartCheckPeriod, err = 
parseDuration(values[1])
+               config.Consumer.MaxPartCheckPeriod, err = 
parseDuration(values[1])
        case "partCheckSlice":
-               config.Consumer.partCheckSlice, err = parseDuration(values[1])
+               config.Consumer.PartCheckSlice, err = parseDuration(values[1])
        case "msgNotFoundWait":
-               config.Consumer.msgNotFoundWait, err = parseDuration(values[1])
+               config.Consumer.MsgNotFoundWait, err = parseDuration(values[1])
        case "rebConfirmWait":
-               config.Consumer.rebConfirmWait, err = parseDuration(values[1])
+               config.Consumer.RebConfirmWait, err = parseDuration(values[1])
        case "maxConfirmWait":
-               config.Consumer.maxConfirmWait, err = parseDuration(values[1])
+               config.Consumer.MaxConfirmWait, err = parseDuration(values[1])
        case "shutdownRebWait":
-               config.Consumer.shutdownRebWait, err = parseDuration(values[1])
+               config.Consumer.ShutdownRebWait, err = parseDuration(values[1])
        case "heartbeatInterval":
-               config.Heartbeat.interval, err = parseDuration(values[1])
+               config.Heartbeat.Interval, err = parseDuration(values[1])
        case "heartbeatMaxRetryTimes":
-               config.Heartbeat.maxRetryTimes, err = strconv.Atoi(values[1])
+               config.Heartbeat.MaxRetryTimes, err = strconv.Atoi(values[1])
        case "heartbeatAfterFail":
-               config.Heartbeat.afterFail, err = parseDuration(values[1])
+               config.Heartbeat.AfterFail, err = parseDuration(values[1])
+       case "authEnable":
+               config.Net.Auth.Enable, err = strconv.ParseBool(values[1])
+       case "authUserName":
+               config.Net.Auth.UserName = values[1]
+       case "authPassword":
+               config.Net.Auth.Password = values[1]
        default:
                return fmt.Errorf("address format invalid, unknown keys: %v", 
values[0])
        }
diff --git a/tubemq-client-twins/tubemq-client-go/config/config_test.go 
b/tubemq-client-twins/tubemq-client-go/config/config_test.go
index acefe1c..56bc600 100644
--- a/tubemq-client-twins/tubemq-client-go/config/config_test.go
+++ b/tubemq-client-twins/tubemq-client-go/config/config_test.go
@@ -28,24 +28,24 @@ func TestParseAddress(t *testing.T) {
        address := 
"127.0.0.1:9092,127.0.0.1:9093?topic=Topic&group=Group&tlsEnable=false&msgNotFoundWait=10000&heartbeatMaxRetryTimes=6"
        c, err := ParseAddress(address)
        assert.Nil(t, err)
-       assert.Equal(t, c.Consumer.masters, []string{"127.0.0.1:9092", 
"127.0.0.1:9093"})
-       assert.Equal(t, c.Consumer.topic, "Topic")
+       assert.Equal(t, c.Consumer.Masters, []string{"127.0.0.1:9092", 
"127.0.0.1:9093"})
+       assert.Equal(t, c.Consumer.Topic, "Topic")
        assert.Equal(t, c.Consumer.Group, "Group")
-       assert.Equal(t, c.Consumer.msgNotFoundWait, 10000 * time.Millisecond)
+       assert.Equal(t, c.Consumer.MsgNotFoundWait, 10000*time.Millisecond)
 
        assert.Equal(t, c.Net.TLS.Enable, false)
 
-       assert.Equal(t, c.Heartbeat.maxRetryTimes, 6)
+       assert.Equal(t, c.Heartbeat.MaxRetryTimes, 6)
 
        address = ""
        _, err = ParseAddress(address)
        assert.NotNil(t, err)
 
-       address = "127.0.0.1:9092,127.0.0.1:9093?topic=Topic&ttt"
+       address = "127.0.0.1:9092,127.0.0.1:9093?Topic=Topic&ttt"
        _, err = ParseAddress(address)
        assert.NotNil(t, err)
 
-       address = "127.0.0.1:9092,127.0.0.1:9093?topic=Topic&ttt=ttt"
+       address = "127.0.0.1:9092,127.0.0.1:9093?Topic=Topic&ttt=ttt"
        _, err = ParseAddress(address)
        assert.NotNil(t, err)
 }

Reply via email to