This is an automated email from the ASF dual-hosted git repository.

ztelur pushed a commit to branch eventmesh
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git


The following commit(s) were added to refs/heads/eventmesh by this push:
     new 1e75e1e  polish kafka init (#287)
1e75e1e is described below

commit 1e75e1ec87140fc838d661210d35a13b432627d5
Author: YuDong Tang <[email protected]>
AuthorDate: Mon Oct 25 19:24:08 2021 +0800

    polish kafka init (#287)
---
 go.sum                        |  1 +
 pkg/client/mq/kafka_facade.go | 36 +++++++++++++++++++----
 pkg/client/mq/mq.go           | 20 +++++--------
 pkg/filter/event/event.go     | 68 ++++++++++++++++++-------------------------
 4 files changed, 69 insertions(+), 56 deletions(-)

diff --git a/go.sum b/go.sum
index 8eb4eea..1f5ba1e 100644
--- a/go.sum
+++ b/go.sum
@@ -67,6 +67,7 @@ github.com/RoaringBitmap/roaring v0.6.1 
h1:O36Tdaj1Fi/zyr25shTHwlQPGdq53+u4WkM08
 github.com/RoaringBitmap/roaring v0.6.1/go.mod 
h1:WZ83fjBF/7uBHi6QoFyfGL4+xuV4Qn+xFkm4+vSzrhE=
 github.com/Shopify/sarama v1.19.0 
h1:9oksLxC6uxVPHPVYUmq6xhr1BOF/hHobWH2UzO67z1s=
 github.com/Shopify/sarama v1.19.0/go.mod 
h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
+github.com/Shopify/toxiproxy v2.1.4+incompatible 
h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
 github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod 
h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
 github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod 
h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
 github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d 
h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk=
diff --git a/pkg/client/mq/kafka_facade.go b/pkg/client/mq/kafka_facade.go
index f2abc21..99b3661 100644
--- a/pkg/client/mq/kafka_facade.go
+++ b/pkg/client/mq/kafka_facade.go
@@ -37,13 +37,24 @@ import (
        perrors "github.com/pkg/errors"
 )
 
-func NewKafkaConsumerFacade(addrs []string, config *sarama.Config) 
(*KafkaConsumerFacade, error) {
-       consumer, err := sarama.NewConsumer(addrs, config)
+func NewKafkaConsumerFacade(config event.KafkaConsumerConfig) 
(*KafkaConsumerFacade, error) {
+       c := sarama.NewConfig()
+       c.ClientID = config.ClientID
+       c.Metadata.Full = config.Metadata.Full
+       c.Metadata.Retry.Max = config.Metadata.Retry.Max
+       c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
+       if config.ProtocolVersion != "" {
+               version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
+               if err != nil {
+                       return nil, err
+               }
+               c.Version = version
+       }
+       consumer, err := sarama.NewConsumer(config.Brokers, c)
        if err != nil {
                return nil, err
        }
 
-       // does not set up cookiejar may cause some problem
        return &KafkaConsumerFacade{consumer: consumer, httpClient: 
&http.Client{Timeout: 5 * time.Second}, done: make(chan struct{})}, nil
 }
 
@@ -186,8 +197,23 @@ func (f *KafkaConsumerFacade) Stop() {
        f.wg.Wait()
 }
 
-func NewKafkaProviderFacade(addrs []string, config *sarama.Config) 
(*KafkaProducerFacade, error) {
-       producer, err := sarama.NewSyncProducer(addrs, config)
+func NewKafkaProviderFacade(config event.KafkaProducerConfig) 
(*KafkaProducerFacade, error) {
+       c := sarama.NewConfig()
+       c.Producer.Return.Successes = true
+       c.Producer.Return.Errors = true
+       c.Producer.RequiredAcks = sarama.WaitForLocal
+       c.Metadata.Full = config.Metadata.Full
+       c.Metadata.Retry.Max = config.Metadata.Retry.Max
+       c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
+       c.Producer.MaxMessageBytes = config.Producer.MaxMessageBytes
+       if config.ProtocolVersion != "" {
+               version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
+               if err != nil {
+                       return nil, err
+               }
+               c.Version = version
+       }
+       producer, err := sarama.NewSyncProducer(config.Brokers, c)
        if err != nil {
                return nil, err
        }
diff --git a/pkg/client/mq/mq.go b/pkg/client/mq/mq.go
index d05e51f..de57170 100644
--- a/pkg/client/mq/mq.go
+++ b/pkg/client/mq/mq.go
@@ -57,20 +57,16 @@ func NewSingletonMQClient(config event.Config) *Client {
 func NewMQClient(config event.Config) (*Client, error) {
        var c *Client
        ctx := context.Background()
-
-       sc := config.ToSaramaConfig()
-       addrs := strings.Split(config.Endpoints, ",")
-       cf, err := NewKafkaConsumerFacade(addrs, sc)
-       if err != nil {
-               return nil, err
-       }
-       pf, err := NewKafkaProviderFacade(addrs, sc)
-       if err != nil {
-               return nil, err
-       }
-
        switch config.MqType {
        case constant.MQTypeKafka:
+               cf, err := NewKafkaConsumerFacade(config.KafkaConsumerConfig)
+               if err != nil {
+                       return nil, err
+               }
+               pf, err := NewKafkaProviderFacade(config.KafkaProducerConfig)
+               if err != nil {
+                       return nil, err
+               }
                c = &Client{
                        ctx:            ctx,
                        consumerFacade: cf,
diff --git a/pkg/filter/event/event.go b/pkg/filter/event/event.go
index 56bfb0e..e367819 100644
--- a/pkg/filter/event/event.go
+++ b/pkg/filter/event/event.go
@@ -25,11 +25,6 @@ import (
        "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
        "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
        "github.com/apache/dubbo-go-pixiu/pkg/context/http"
-       "github.com/apache/dubbo-go-pixiu/pkg/logger"
-)
-
-import (
-       "github.com/Shopify/sarama"
 )
 
 const (
@@ -52,48 +47,43 @@ type (
        }
 
        Config struct {
-               ClientId     string        `yaml:"client_id" json:"client_id"`
-               Endpoints    string        `yaml:"endpoints" json:"endpoints"`
-               MqType       MQType        `yaml:"type" json:"type"`
-               KafkaVersion string        `yaml:"kafka_version" 
json:"kafka_version"`
-               Retry        int           `yaml:"retry" json:"retry" 
default:"5"`
-               Timeout      time.Duration `yaml:"timeout" json:"timeout" 
default:"2s"`
-               Offset       string        `yaml:"offset" json:"offset" 
default:"oldest"` // Offset newest or oldest
+               ClientId            string              `yaml:"client_id" 
json:"client_id"`
+               Endpoints           string              `yaml:"endpoints" 
json:"endpoints"`
+               MqType              MQType              `yaml:"type" 
json:"type"`
+               Retry               int                 `yaml:"retry" 
json:"retry" default:"5"`
+               Timeout             time.Duration       `yaml:"timeout" 
json:"timeout" default:"2s"`
+               KafkaConsumerConfig KafkaConsumerConfig 
`yaml:"kafka_consumer_config" json:"kafka_consumer_config"`
+               KafkaProducerConfig KafkaProducerConfig 
`yaml:"kafka_producer_config" json:"kafka_producer_config"`
        }
-)
-
-func (c *Config) ToSaramaConfig() *sarama.Config {
-       config := sarama.NewConfig()
 
-       version, err := sarama.ParseKafkaVersion(c.KafkaVersion)
-       if err != nil {
-               version = sarama.V2_0_0_0
-               logger.Warnf("kafka version is invalid, use sarama.V2_0_0_0 
instead, err: %s", err.Error())
+       KafkaConsumerConfig struct {
+               Brokers         []string `yaml:"brokers" json:"brokers"`
+               ProtocolVersion string   `yaml:"protocol_version" 
json:"protocol_version"`
+               ClientID        string   `yaml:"client_id" json:"client_id"`
+               Metadata        Metadata `yaml:"metadata" json:"metadata"`
        }
-       config.Version = version
-
-       offset := sarama.OffsetNewest
-       switch c.Offset {
-       case "newest":
-               offset = sarama.OffsetNewest
-       case "oldest":
-               offset = sarama.OffsetOldest
-       default:
-               logger.Warn("offset is invalid, use oldest instead")
+
+       KafkaProducerConfig struct {
+               Brokers         []string `yaml:"brokers" json:"brokers"`
+               ProtocolVersion string   `yaml:"protocol_version" 
json:"protocol_version"`
+               Metadata        Metadata `yaml:"metadata" json:"metadata"`
+               Producer        Producer `yaml:"producer" json:"producer"`
        }
-       config.Consumer.Offsets.Initial = offset
 
-       config.ClientID = "pixiu-kafka"
-       if c.ClientId != "" {
-               config.ClientID = c.ClientId
+       Metadata struct {
+               Full  bool          `yaml:"full" json:"full"`
+               Retry MetadataRetry `yaml:"retry" json:"retry"`
        }
-       logger.Infof("kafka client id is %s", config.ClientID)
 
-       config.Producer.Retry.Max = c.Retry
-       config.Producer.Timeout = c.Timeout
+       MetadataRetry struct {
+               Max     int           `yaml:"max" json:"max"`
+               Backoff time.Duration `yaml:"backoff" json:"backoff"`
+       }
 
-       return config
-}
+       Producer struct {
+               MaxMessageBytes int `yaml:"max_message_bytes" 
json:"max_message_bytes"`
+       }
+)
 
 func (p *Plugin) Kind() string {
        return Kind

Reply via email to