This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch INLONG-25 in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 79e0dfab45d12883e6711e8c7785797deafd7f55 Author: Zijie Lu <[email protected]> AuthorDate: Tue May 25 15:45:18 2021 +0800 [INLONG-619]Configuration for Go SDK. Signed-off-by: Zijie Lu <[email protected]> --- .../tubemq-client-go/config/config.go | 172 ++++++++++++++++++++- .../tubemq-client-go/config/config_test.go | 54 +++++++ 2 files changed, 225 insertions(+), 1 deletion(-) diff --git a/tubemq-client-twins/tubemq-client-go/config/config.go b/tubemq-client-twins/tubemq-client-go/config/config.go index f290f07..f4b026e 100644 --- a/tubemq-client-twins/tubemq-client-go/config/config.go +++ b/tubemq-client-twins/tubemq-client-go/config/config.go @@ -19,14 +19,184 @@ package config import ( + "fmt" + "net/url" + "strconv" + "strings" "time" ) // Config defines multiple configuration options. type Config struct { - // Net iis the namespace for network-level properties used by Broker and Master. + // Net is the namespace for network-level properties used by Broker and Master. Net struct { // How long to wait for a response. ReadTimeout time.Duration + // TLS based authentication with broker and master. + TLS struct { + // Whether or not to use TLS. + Enable bool + // CACertFile for TLS. + CACertFile string + // TLSCertFile for TLS. + TLSCertFile string + // TLSKeyFile for TLS. + TLSKeyFile string + // TTSServerName for TLS. + TLSServerName string + } } + + // Consumer is the namespace for configuration related to consume messages, + // used by the consumer + Consumer struct { + masters []string + topic string + offset int + Group string + boundConsume bool + sessionKey string + sourceCount int + selectBig bool + rollbackIfConfirmTimeout bool + maxSubInfoReportInterval int + maxPartCheckPeriod time.Duration + partCheckSlice time.Duration + msgNotFoundWait time.Duration + rebConfirmWait time.Duration + maxConfirmWait time.Duration + shutdownRebWait time.Duration + } + + // Heartbeat is the namespace for configuration related to heartbeat messages, + // used by the consumer + Heartbeat struct { + interval time.Duration + maxRetryTimes int + afterFail time.Duration + } +} + +func newDefaultConfig() *Config { + c := &Config{} + + c.Net.ReadTimeout = 15000 * time.Millisecond + c.Net.TLS.Enable = false + + c.Consumer.boundConsume = false + c.Consumer.sessionKey = "" + c.Consumer.sourceCount = 0 + c.Consumer.selectBig = true + c.Consumer.offset = 0 + c.Consumer.rollbackIfConfirmTimeout = true + c.Consumer.maxSubInfoReportInterval = 6 + c.Consumer.maxPartCheckPeriod = 60000 * time.Millisecond + c.Consumer.partCheckSlice = 300 * time.Millisecond + c.Consumer.rebConfirmWait = 3000 * time.Millisecond + c.Consumer.maxConfirmWait = 60000 * time.Millisecond + c.Consumer.shutdownRebWait = 10000 * time.Millisecond + + c.Heartbeat.interval = 10000 * time.Millisecond + c.Heartbeat.maxRetryTimes = 5 + c.Heartbeat.afterFail = 60000 * time.Millisecond + + return c +} + +// ParseAddress parses the address to user-defined config. +func ParseAddress(address string) (config *Config, err error) { + c := newDefaultConfig() + + tokens := strings.SplitN(address, "?", 2) + if len(tokens) != 2 { + return nil, fmt.Errorf("address format invalid: address: %v, token: %v", address, tokens) + } + + c.Consumer.masters = strings.Split(tokens[0], ",") + + tokens = strings.Split(tokens[1], "&") + if len(tokens) == 0 { + return nil, fmt.Errorf("address formata invalid: masters: %v with empty params", config.Consumer.masters) + } + + for _, token := range tokens { + values := strings.SplitN(token, "=", 2) + if len(values) != 2 { + return nil, fmt.Errorf("address format invalid, key=value missing: %v", values) + } + + values[1], _ = url.QueryUnescape(values[1]) + if err := getConfigFromToken(c, values); err != nil { + return nil, err + } + } + return c, nil +} + +func getConfigFromToken(config *Config, values []string) error { + var err error + switch values[0] { + case "readTimeout": + config.Net.ReadTimeout, err = parseDuration(values[1]) + case "tlsEnable": + config.Net.TLS.Enable, err = strconv.ParseBool(values[1]) + case "caCertFile": + config.Net.TLS.CACertFile = values[1] + case "tlsCertFile": + config.Net.TLS.TLSCertFile = values[1] + case "tlsKeyFile": + config.Net.TLS.TLSKeyFile = values[1] + case "tlsServerName": + config.Net.TLS.TLSServerName = values[1] + case "group": + config.Consumer.Group = values[1] + case "topic": + config.Consumer.topic = values[1] + case "offset": + config.Consumer.offset, err = strconv.Atoi(values[1]) + case "boundConsume": + config.Consumer.boundConsume, err = strconv.ParseBool(values[1]) + case "sessionKey": + config.Consumer.sessionKey = values[1] + case "sourceCount": + config.Consumer.sourceCount, err = strconv.Atoi(values[1]) + case "selectBig": + config.Consumer.selectBig, err = strconv.ParseBool(values[1]) + case "rollbackIfConfirmTimeout": + config.Consumer.rollbackIfConfirmTimeout, err = strconv.ParseBool(values[1]) + case "maxSubInfoReportInterval": + config.Consumer.maxSubInfoReportInterval, err = strconv.Atoi(values[1]) + case "maxPartCheckPeriod": + config.Consumer.maxPartCheckPeriod, err = parseDuration(values[1]) + case "partCheckSlice": + config.Consumer.partCheckSlice, err = parseDuration(values[1]) + case "msgNotFoundWait": + config.Consumer.msgNotFoundWait, err = parseDuration(values[1]) + case "rebConfirmWait": + config.Consumer.rebConfirmWait, err = parseDuration(values[1]) + case "maxConfirmWait": + config.Consumer.maxConfirmWait, err = parseDuration(values[1]) + case "shutdownRebWait": + config.Consumer.shutdownRebWait, err = parseDuration(values[1]) + case "heartbeatInterval": + config.Heartbeat.interval, err = parseDuration(values[1]) + case "heartbeatMaxRetryTimes": + config.Heartbeat.maxRetryTimes, err = strconv.Atoi(values[1]) + case "heartbeatAfterFail": + config.Heartbeat.afterFail, err = parseDuration(values[1]) + default: + return fmt.Errorf("address format invalid, unknown keys: %v", values[0]) + } + if err != nil { + return fmt.Errorf("address format invalid(%v) err:%s", values[0], err.Error()) + } + return err +} + +func parseDuration(val string) (time.Duration, error) { + maxWait, err := strconv.Atoi(val) + if err != nil { + return 0, err + } + return time.Duration(maxWait) * time.Millisecond, err } diff --git a/tubemq-client-twins/tubemq-client-go/config/config_test.go b/tubemq-client-twins/tubemq-client-go/config/config_test.go new file mode 100644 index 0000000..f5d4a17 --- /dev/null +++ b/tubemq-client-twins/tubemq-client-go/config/config_test.go @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestParseAddress(t *testing.T) { + address := "127.0.0.1:9092,127.0.0.1:9093?topic=Topic&group=Group&tlsEnable=false&msgNotFoundWait=10000&heartbeatMaxRetryTimes=6" + c, err := ParseAddress(address) + assert.Nil(t, err) + assert.Equal(t, c.Consumer.masters, []string{"127.0.0.1:9092", "127.0.0.1:9093"}) + assert.Equal(t, c.Consumer.topic, "Topic") + assert.Equal(t, c.Consumer.Group, "Group") + assert.Equal(t, c.Consumer.msgNotFoundWait, 10000 * time.Millisecond) + + assert.Equal(t, c.Net.TLS.Enable, false) + + assert.Equal(t, c.Heartbeat.maxRetryTimes, 6) + + address = "" + _, err = ParseAddress(address) + assert.NotNil(t, err) + + address = "127.0.0.1:9092,127.0.0.1:9093?topic=Topic&ttt" + _, err = ParseAddress(address) + assert.NotNil(t, err) + + address = "127.0.0.1:9092,127.0.0.1:9093?topic=Topic&ttt=ttt" + _, err = ParseAddress(address) + assert.NotNil(t, err) +} + + +
