This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch INLONG-25 in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 3cf68fbad554f7a6cdaddf6fd1021b4595650dd6 Author: Zijie Lu <[email protected]> AuthorDate: Wed May 26 10:08:47 2021 +0800 Address review comments Signed-off-by: Zijie Lu <[email protected]> --- .../tubemq-client-go/config/config.go | 161 +++++++++++++-------- .../tubemq-client-go/config/config_test.go | 12 +- 2 files changed, 109 insertions(+), 64 deletions(-) diff --git a/tubemq-client-twins/tubemq-client-go/config/config.go b/tubemq-client-twins/tubemq-client-go/config/config.go index f4b026e..d867d0c 100644 --- a/tubemq-client-twins/tubemq-client-go/config/config.go +++ b/tubemq-client-twins/tubemq-client-go/config/config.go @@ -27,13 +27,14 @@ import ( ) // Config defines multiple configuration options. +// Refer to: https://github.com/apache/incubator-inlong/blob/3249de37acf054a9c43677131cfbb09fc6d366d1/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java type Config struct { // Net is the namespace for network-level properties used by Broker and Master. Net struct { // How long to wait for a response. ReadTimeout time.Duration // TLS based authentication with broker and master. - TLS struct { + TLS struct { // Whether or not to use TLS. Enable bool // CACertFile for TLS. @@ -45,35 +46,68 @@ type Config struct { // TTSServerName for TLS. TLSServerName string } + // Account based authentication with broker and master. + Auth struct { + // Whether or not to use authentication. + Enable bool + // Username of authentication. + UserName string + // Password of authentication. + Password string + } } // Consumer is the namespace for configuration related to consume messages, // used by the consumer Consumer struct { - masters []string - topic string - offset int - Group string - boundConsume bool - sessionKey string - sourceCount int - selectBig bool - rollbackIfConfirmTimeout bool - maxSubInfoReportInterval int - maxPartCheckPeriod time.Duration - partCheckSlice time.Duration - msgNotFoundWait time.Duration - rebConfirmWait time.Duration - maxConfirmWait time.Duration - shutdownRebWait time.Duration + // The addresses of master. + Masters []string + // The consumption topic. + Topic string + // The initial offset to use if no offset was previously committed. + ConsumePosition int + // The consumer group name. + Group string + // Whether or not to specify the offset. + BoundConsume bool + // SessionKey is defined by the client. + // The session key will be the same in a batch. + SessionKey string + // The number of consumers in a batch. + SourceCount int + // If multiple consumers want to reset the offset of the same partition, + // whether the server or not to use the biggest offset + // the server will use the biggest offset if set. + // Otherwise the server will use the smallest offset. + SelectBig bool + // If the confirm request timeouts, whether this batch of data should be considered as successful. + // This batch of data will not be considered as successful if set. + RollbackIfConfirmTimeout bool + // The maximum interval for the client to report subscription information. + MaxSubInfoReportInterval int + // The maximum interval to check the partition. + MaxPartCheckPeriod time.Duration + // The interval to check the partition. + PartCheckSlice time.Duration + // The maximum wait time the offset of a partition has reached the maximum offset. + MsgNotFoundWait time.Duration + // How long to wait when the server is rebalancing and the partition is being occupied by the client. + RebConfirmWait time.Duration + // The maximum wait time a partition consumption command is released. + MaxConfirmWait time.Duration + // How long to wait when shutdown is called and the server is rebalancing. + ShutdownRebWait time.Duration } // Heartbeat is the namespace for configuration related to heartbeat messages, // used by the consumer Heartbeat struct { - interval time.Duration - maxRetryTimes int - afterFail time.Duration + // How frequently to send heartbeat. + Interval time.Duration + // The total number of times to retry sending heartbeat. + MaxRetryTimes int + // The heartbeat timeout after a heartbeat failure. + AfterFail time.Duration } } @@ -82,23 +116,28 @@ func newDefaultConfig() *Config { c.Net.ReadTimeout = 15000 * time.Millisecond c.Net.TLS.Enable = false - - c.Consumer.boundConsume = false - c.Consumer.sessionKey = "" - c.Consumer.sourceCount = 0 - c.Consumer.selectBig = true - c.Consumer.offset = 0 - c.Consumer.rollbackIfConfirmTimeout = true - c.Consumer.maxSubInfoReportInterval = 6 - c.Consumer.maxPartCheckPeriod = 60000 * time.Millisecond - c.Consumer.partCheckSlice = 300 * time.Millisecond - c.Consumer.rebConfirmWait = 3000 * time.Millisecond - c.Consumer.maxConfirmWait = 60000 * time.Millisecond - c.Consumer.shutdownRebWait = 10000 * time.Millisecond - - c.Heartbeat.interval = 10000 * time.Millisecond - c.Heartbeat.maxRetryTimes = 5 - c.Heartbeat.afterFail = 60000 * time.Millisecond + c.Net.Auth.Enable = false + c.Net.Auth.UserName = "" + c.Net.Auth.Password = "" + + c.Consumer.Group = "" + c.Consumer.BoundConsume = false + c.Consumer.SessionKey = "" + c.Consumer.SourceCount = 0 + c.Consumer.SelectBig = true + c.Consumer.ConsumePosition = 0 + c.Consumer.RollbackIfConfirmTimeout = true + c.Consumer.MaxSubInfoReportInterval = 6 + c.Consumer.MaxPartCheckPeriod = 60000 * time.Millisecond + c.Consumer.PartCheckSlice = 300 * time.Millisecond + c.Consumer.MsgNotFoundWait = 400 * time.Millisecond + c.Consumer.RebConfirmWait = 3000 * time.Millisecond + c.Consumer.MaxConfirmWait = 60000 * time.Millisecond + c.Consumer.ShutdownRebWait = 10000 * time.Millisecond + + c.Heartbeat.Interval = 10000 * time.Millisecond + c.Heartbeat.MaxRetryTimes = 5 + c.Heartbeat.AfterFail = 60000 * time.Millisecond return c } @@ -112,11 +151,11 @@ func ParseAddress(address string) (config *Config, err error) { return nil, fmt.Errorf("address format invalid: address: %v, token: %v", address, tokens) } - c.Consumer.masters = strings.Split(tokens[0], ",") + c.Consumer.Masters = strings.Split(tokens[0], ",") tokens = strings.Split(tokens[1], "&") if len(tokens) == 0 { - return nil, fmt.Errorf("address formata invalid: masters: %v with empty params", config.Consumer.masters) + return nil, fmt.Errorf("address formata invalid: Masters: %v with empty params", config.Consumer.Masters) } for _, token := range tokens { @@ -140,7 +179,7 @@ func getConfigFromToken(config *Config, values []string) error { config.Net.ReadTimeout, err = parseDuration(values[1]) case "tlsEnable": config.Net.TLS.Enable, err = strconv.ParseBool(values[1]) - case "caCertFile": + case "CACertFile": config.Net.TLS.CACertFile = values[1] case "tlsCertFile": config.Net.TLS.TLSCertFile = values[1] @@ -151,39 +190,45 @@ func getConfigFromToken(config *Config, values []string) error { case "group": config.Consumer.Group = values[1] case "topic": - config.Consumer.topic = values[1] - case "offset": - config.Consumer.offset, err = strconv.Atoi(values[1]) + config.Consumer.Topic = values[1] + case "consumePosition": + config.Consumer.ConsumePosition, err = strconv.Atoi(values[1]) case "boundConsume": - config.Consumer.boundConsume, err = strconv.ParseBool(values[1]) + config.Consumer.BoundConsume, err = strconv.ParseBool(values[1]) case "sessionKey": - config.Consumer.sessionKey = values[1] + config.Consumer.SessionKey = values[1] case "sourceCount": - config.Consumer.sourceCount, err = strconv.Atoi(values[1]) + config.Consumer.SourceCount, err = strconv.Atoi(values[1]) case "selectBig": - config.Consumer.selectBig, err = strconv.ParseBool(values[1]) + config.Consumer.SelectBig, err = strconv.ParseBool(values[1]) case "rollbackIfConfirmTimeout": - config.Consumer.rollbackIfConfirmTimeout, err = strconv.ParseBool(values[1]) + config.Consumer.RollbackIfConfirmTimeout, err = strconv.ParseBool(values[1]) case "maxSubInfoReportInterval": - config.Consumer.maxSubInfoReportInterval, err = strconv.Atoi(values[1]) + config.Consumer.MaxSubInfoReportInterval, err = strconv.Atoi(values[1]) case "maxPartCheckPeriod": - config.Consumer.maxPartCheckPeriod, err = parseDuration(values[1]) + config.Consumer.MaxPartCheckPeriod, err = parseDuration(values[1]) case "partCheckSlice": - config.Consumer.partCheckSlice, err = parseDuration(values[1]) + config.Consumer.PartCheckSlice, err = parseDuration(values[1]) case "msgNotFoundWait": - config.Consumer.msgNotFoundWait, err = parseDuration(values[1]) + config.Consumer.MsgNotFoundWait, err = parseDuration(values[1]) case "rebConfirmWait": - config.Consumer.rebConfirmWait, err = parseDuration(values[1]) + config.Consumer.RebConfirmWait, err = parseDuration(values[1]) case "maxConfirmWait": - config.Consumer.maxConfirmWait, err = parseDuration(values[1]) + config.Consumer.MaxConfirmWait, err = parseDuration(values[1]) case "shutdownRebWait": - config.Consumer.shutdownRebWait, err = parseDuration(values[1]) + config.Consumer.ShutdownRebWait, err = parseDuration(values[1]) case "heartbeatInterval": - config.Heartbeat.interval, err = parseDuration(values[1]) + config.Heartbeat.Interval, err = parseDuration(values[1]) case "heartbeatMaxRetryTimes": - config.Heartbeat.maxRetryTimes, err = strconv.Atoi(values[1]) + config.Heartbeat.MaxRetryTimes, err = strconv.Atoi(values[1]) case "heartbeatAfterFail": - config.Heartbeat.afterFail, err = parseDuration(values[1]) + config.Heartbeat.AfterFail, err = parseDuration(values[1]) + case "authEnable": + config.Net.Auth.Enable, err = strconv.ParseBool(values[1]) + case "authUserName": + config.Net.Auth.UserName = values[1] + case "authPassword": + config.Net.Auth.Password = values[1] default: return fmt.Errorf("address format invalid, unknown keys: %v", values[0]) } diff --git a/tubemq-client-twins/tubemq-client-go/config/config_test.go b/tubemq-client-twins/tubemq-client-go/config/config_test.go index acefe1c..56bc600 100644 --- a/tubemq-client-twins/tubemq-client-go/config/config_test.go +++ b/tubemq-client-twins/tubemq-client-go/config/config_test.go @@ -28,24 +28,24 @@ func TestParseAddress(t *testing.T) { address := "127.0.0.1:9092,127.0.0.1:9093?topic=Topic&group=Group&tlsEnable=false&msgNotFoundWait=10000&heartbeatMaxRetryTimes=6" c, err := ParseAddress(address) assert.Nil(t, err) - assert.Equal(t, c.Consumer.masters, []string{"127.0.0.1:9092", "127.0.0.1:9093"}) - assert.Equal(t, c.Consumer.topic, "Topic") + assert.Equal(t, c.Consumer.Masters, []string{"127.0.0.1:9092", "127.0.0.1:9093"}) + assert.Equal(t, c.Consumer.Topic, "Topic") assert.Equal(t, c.Consumer.Group, "Group") - assert.Equal(t, c.Consumer.msgNotFoundWait, 10000 * time.Millisecond) + assert.Equal(t, c.Consumer.MsgNotFoundWait, 10000*time.Millisecond) assert.Equal(t, c.Net.TLS.Enable, false) - assert.Equal(t, c.Heartbeat.maxRetryTimes, 6) + assert.Equal(t, c.Heartbeat.MaxRetryTimes, 6) address = "" _, err = ParseAddress(address) assert.NotNil(t, err) - address = "127.0.0.1:9092,127.0.0.1:9093?topic=Topic&ttt" + address = "127.0.0.1:9092,127.0.0.1:9093?Topic=Topic&ttt" _, err = ParseAddress(address) assert.NotNil(t, err) - address = "127.0.0.1:9092,127.0.0.1:9093?topic=Topic&ttt=ttt" + address = "127.0.0.1:9092,127.0.0.1:9093?Topic=Topic&ttt=ttt" _, err = ParseAddress(address) assert.NotNil(t, err) }
