This is an automated email from the ASF dual-hosted git repository.

ztelur pushed a commit to branch eventmesh
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git


The following commit(s) were added to refs/heads/eventmesh by this push:
     new 3b37a3c  Feat: event mesh (#286)
3b37a3c is described below

commit 3b37a3c116cffc2aba6699cd526a56e1ee2c2429
Author: MasterKenway <[email protected]>
AuthorDate: Sat Oct 23 22:36:17 2021 +0800

    Feat: event mesh (#286)
    
    * feat: mq facade definition
    
    * feat: mq facade definition (kafka impl)
    
    * feat: mq type definition
    
    * feat: msg definition
    
    * feat: mq facade definition (kafka impl)
    
    * feat: init mq filter
    
    * feat: init mq client
    
    * feat: make params config with Options implement with builder mode
    
    * fix: fit dynamic options
    
    * feat: add producer facade
    
    * docs: add lic header
    
    * feat: call facade
    
    * fix: resolve import cycle
    
    * fix: use chan as stop signal
    
    * fix: go lint
---
 go.mod                        |   1 +
 go.sum                        |   6 ++
 pkg/client/mq/facade.go       |  96 +++++++++++++++++++
 pkg/client/mq/kafka_facade.go | 211 ++++++++++++++++++++++++++++++++++++++++++
 pkg/client/mq/mq.go           | 145 +++++++++++++++++++++++++++++
 pkg/common/constant/key.go    |   6 ++
 pkg/filter/event/event.go     | 122 ++++++++++++++++++++++++
 pkg/filter/event/msg.go       |  63 +++++++++++++
 8 files changed, 650 insertions(+)

diff --git a/go.mod b/go.mod
index b227777..6443cfd 100644
--- a/go.mod
+++ b/go.mod
@@ -3,6 +3,7 @@ module github.com/apache/dubbo-go-pixiu
 go 1.14
 
 require (
+       github.com/Shopify/sarama v1.19.0
        github.com/alibaba/sentinel-golang v1.0.2
        github.com/apache/dubbo-go v1.5.7
        github.com/apache/dubbo-go-hessian2 v1.9.3
diff --git a/go.sum b/go.sum
index 81bc996..8eb4eea 100644
--- a/go.sum
+++ b/go.sum
@@ -65,6 +65,7 @@ github.com/PuerkitoBio/purell v1.0.0/go.mod 
h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbt
 github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod 
h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
 github.com/RoaringBitmap/roaring v0.6.1 
h1:O36Tdaj1Fi/zyr25shTHwlQPGdq53+u4WkM08AOEjiE=
 github.com/RoaringBitmap/roaring v0.6.1/go.mod 
h1:WZ83fjBF/7uBHi6QoFyfGL4+xuV4Qn+xFkm4+vSzrhE=
+github.com/Shopify/sarama v1.19.0 
h1:9oksLxC6uxVPHPVYUmq6xhr1BOF/hHobWH2UzO67z1s=
 github.com/Shopify/sarama v1.19.0/go.mod 
h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
 github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod 
h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
 github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod 
h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
@@ -195,8 +196,11 @@ github.com/dubbogo/jsonparser v1.0.1/go.mod 
h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWb
 github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod 
h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
 github.com/dustin/go-humanize v1.0.0 
h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
 github.com/dustin/go-humanize v1.0.0/go.mod 
h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
+github.com/eapache/go-resiliency v1.1.0 
h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU=
 github.com/eapache/go-resiliency v1.1.0/go.mod 
h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
+github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 
h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
 github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod 
h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
+github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
 github.com/eapache/queue v1.1.0/go.mod 
h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
 github.com/edsrzf/mmap-go v1.0.0/go.mod 
h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
 github.com/elazarl/go-bindata-assetfs 
v0.0.0-20160803192304-e1a2a7ec64b0/go.mod 
h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4=
@@ -616,6 +620,7 @@ github.com/performancecopilot/speed 
v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9
 github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod 
h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
 github.com/philhofer/fwd v1.0.0/go.mod 
h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
 github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod 
h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
+github.com/pierrec/lz4 v2.0.5+incompatible 
h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
 github.com/pierrec/lz4 v2.0.5+incompatible/go.mod 
h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
 github.com/pkg/errors v0.8.0/go.mod 
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pkg/errors v0.8.1/go.mod 
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@@ -669,6 +674,7 @@ github.com/prometheus/procfs v0.6.0 
h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3x
 github.com/prometheus/procfs v0.6.0/go.mod 
h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
 github.com/prometheus/tsdb v0.7.1/go.mod 
h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
 github.com/rboyer/safeio v0.2.1/go.mod 
h1:Cq/cEPK+YXFn622lsQ0K4KsPZSPtaptHHEldsy7Fmig=
+github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a 
h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod 
h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
 github.com/renier/xmlrpc v0.0.0-20170708154548-ce4a1a486c03/go.mod 
h1:gRAiPF5C5Nd0eyyRdqIu9qTiFSoZzpTq727b5B8fkkU=
 github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod 
h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
diff --git a/pkg/client/mq/facade.go b/pkg/client/mq/facade.go
new file mode 100644
index 0000000..e177c3c
--- /dev/null
+++ b/pkg/client/mq/facade.go
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mq
+
+import (
+       "context"
+       "strconv"
+       "strings"
+)
+
+type ConsumerFacade interface {
+       // Subscribe message with specified broker and Topic, then handle msg 
with handler which send msg to real consumers
+       Subscribe(ctx context.Context, option ...Option) error
+       UnSubscribe(opts ...Option) error
+       Stop()
+}
+
+func GetConsumerManagerKey(topic string, partition int32) string {
+       return strings.Join([]string{topic, strconv.Itoa(int(partition))}, ".")
+}
+
+// MQOptions Consumer options
+// TODO: Add rocketmq params
+type MQOptions struct {
+       Topic      string
+       Partition  int64
+       ConsumeUrl string
+       CheckUrl   string
+       Offset     int64
+}
+
+func (o *MQOptions) ApplyOpts(opts ...Option) {
+       for _, opt := range opts {
+               opt(o)
+       }
+}
+
+func DefaultOptions() *MQOptions {
+       return &MQOptions{
+               Topic:     "dubbo-go-pixiu-default-topic",
+               Partition: 1,
+               Offset:    -2,
+       }
+}
+
+type Option func(o *MQOptions)
+
+func WithTopic(t string) Option {
+       return func(o *MQOptions) {
+               o.Topic = t
+       }
+}
+
+func WithPartition(p int64) Option {
+       return func(o *MQOptions) {
+               o.Partition = p
+       }
+}
+
+func WithConsumeUrl(ch string) Option {
+       return func(o *MQOptions) {
+               o.ConsumeUrl = ch
+       }
+}
+
+func WithCheckUrl(ck string) Option {
+       return func(o *MQOptions) {
+               o.CheckUrl = ck
+       }
+}
+
+func WithOffset(offset int64) Option {
+       return func(o *MQOptions) {
+               o.Offset = offset
+       }
+}
+
+type ProducerFacade interface {
+       // Send msg to specified broker and Topic
+       Send(msgs []string, opts ...Option) error
+}
diff --git a/pkg/client/mq/kafka_facade.go b/pkg/client/mq/kafka_facade.go
new file mode 100644
index 0000000..f2abc21
--- /dev/null
+++ b/pkg/client/mq/kafka_facade.go
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mq
+
+import (
+       "bytes"
+       "context"
+       "encoding/json"
+       "net/http"
+       "strconv"
+       "sync"
+       "time"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/filter/event"
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+import (
+       "github.com/Shopify/sarama"
+       perrors "github.com/pkg/errors"
+)
+
+func NewKafkaConsumerFacade(addrs []string, config *sarama.Config) 
(*KafkaConsumerFacade, error) {
+       consumer, err := sarama.NewConsumer(addrs, config)
+       if err != nil {
+               return nil, err
+       }
+
+       // does not set up cookiejar may cause some problem
+       return &KafkaConsumerFacade{consumer: consumer, httpClient: 
&http.Client{Timeout: 5 * time.Second}, done: make(chan struct{})}, nil
+}
+
+type KafkaConsumerFacade struct {
+       consumer        sarama.Consumer
+       consumerManager map[string]func()
+       rwLock          sync.RWMutex
+       httpClient      *http.Client
+       wg              sync.WaitGroup
+       done            chan struct{}
+}
+
+func (f *KafkaConsumerFacade) Subscribe(ctx context.Context, opts ...Option) 
error {
+       cOpt := DefaultOptions()
+       cOpt.ApplyOpts(opts...)
+       partConsumer, err := f.consumer.ConsumePartition(cOpt.Topic, 
int32(cOpt.Partition), sarama.OffsetOldest)
+       if err != nil {
+               return err
+       }
+       c, cancel := context.WithCancel(ctx)
+       key := GetConsumerManagerKey(cOpt.Topic, int32(cOpt.Partition))
+       f.rwLock.Lock()
+       defer f.rwLock.Unlock()
+       f.consumerManager[key] = cancel
+       f.wg.Add(2)
+       go f.ConsumePartitions(c, partConsumer, cOpt.ConsumeUrl)
+       go f.checkConsumerIsAlive(c, key, cOpt.CheckUrl)
+       return nil
+}
+
+// ConsumePartitions consume function
+func (f *KafkaConsumerFacade) ConsumePartitions(ctx context.Context, 
partConsumer sarama.PartitionConsumer, consumeUrl string) {
+       defer f.wg.Done()
+
+       for {
+               select {
+               case <-f.done:
+                       logger.Info()
+               case msg := <-partConsumer.Messages():
+                       data, err := json.Marshal(event.MQMsgPush{Msg: 
[]string{string(msg.Value)}})
+                       if err != nil {
+                               logger.Warn()
+                               continue
+                       }
+
+                       req, err := http.NewRequest(http.MethodPost, 
consumeUrl, bytes.NewReader(data))
+                       if err != nil {
+                               logger.Warn()
+                               continue
+                       }
+
+                       for i := 0; i < 5; i++ {
+                               err := func() error {
+                                       resp, err := f.httpClient.Do(req)
+                                       if err != nil {
+                                               return err
+                                       }
+                                       defer resp.Body.Close()
+
+                                       if resp.StatusCode == http.StatusOK {
+                                               return nil
+                                       }
+                                       return perrors.New("failed send msg to 
consumer with status code " + strconv.Itoa(resp.StatusCode))
+                               }()
+                               if err != nil {
+                                       logger.Warn(err.Error())
+                                       time.Sleep(10 * time.Millisecond)
+                               } else {
+                                       break
+                               }
+                       }
+               }
+       }
+}
+
+// checkConsumerIsAlive make sure consumer is alive or would be removed from 
consumer list
+func (f *KafkaConsumerFacade) checkConsumerIsAlive(ctx context.Context, key 
string, checkUrl string) {
+       defer f.wg.Done()
+
+       ticker := time.NewTicker(15 * time.Second)
+       for {
+               select {
+               case <-f.done:
+                       logger.Info()
+               case <-ticker.C:
+                       lastCheck := 0
+                       for i := 0; i < 5; i++ {
+                               err := func() error {
+                                       req, err := 
http.NewRequest(http.MethodGet, checkUrl, bytes.NewReader([]byte{}))
+                                       if err != nil {
+                                               logger.Warn()
+                                       }
+
+                                       resp, err := f.httpClient.Do(req)
+                                       if err != nil {
+                                               logger.Warn()
+                                       }
+                                       defer resp.Body.Close()
+
+                                       lastCheck = resp.StatusCode
+                                       if resp.StatusCode != http.StatusOK {
+                                               return perrors.New("failed 
check consumer alive or not with status code " + strconv.Itoa(resp.StatusCode))
+                                       }
+
+                                       logger.Warn()
+                                       return nil
+                               }()
+                               if err != nil {
+                                       logger.Warn()
+                                       time.Sleep(10 * time.Millisecond)
+                               } else {
+                                       break
+                               }
+                       }
+
+                       if lastCheck != http.StatusOK {
+                               f.consumerManager[key]()
+                               delete(f.consumerManager, key)
+                       }
+
+               }
+       }
+}
+
+func (f *KafkaConsumerFacade) UnSubscribe(opts ...Option) error {
+       cOpt := DefaultOptions()
+       cOpt.ApplyOpts(opts...)
+       key := GetConsumerManagerKey(cOpt.Topic, int32(cOpt.Partition))
+       if cancel, ok := f.consumerManager[key]; !ok {
+               return perrors.New("consumer goroutine not found")
+       } else {
+               cancel()
+               delete(f.consumerManager, key)
+       }
+       return nil
+}
+
+func (f *KafkaConsumerFacade) Stop() {
+       close(f.done)
+       f.wg.Wait()
+}
+
+func NewKafkaProviderFacade(addrs []string, config *sarama.Config) 
(*KafkaProducerFacade, error) {
+       producer, err := sarama.NewSyncProducer(addrs, config)
+       if err != nil {
+               return nil, err
+       }
+       return &KafkaProducerFacade{producer: producer}, nil
+}
+
+type KafkaProducerFacade struct {
+       producer sarama.SyncProducer
+}
+
+func (k *KafkaProducerFacade) Send(msgs []string, opts ...Option) error {
+       pOpt := DefaultOptions()
+       pOpt.ApplyOpts(opts...)
+
+       pMsgs := make([]*sarama.ProducerMessage, 0)
+       for _, msg := range msgs {
+               pMsgs = append(pMsgs, &sarama.ProducerMessage{Topic: 
pOpt.Topic, Value: sarama.StringEncoder(msg)})
+       }
+
+       return k.producer.SendMessages(pMsgs)
+}
diff --git a/pkg/client/mq/mq.go b/pkg/client/mq/mq.go
new file mode 100644
index 0000000..d05e51f
--- /dev/null
+++ b/pkg/client/mq/mq.go
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mq
+
+import (
+       "context"
+       "encoding/json"
+       "io/ioutil"
+       "strings"
+       "sync"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/client"
+       "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+       "github.com/apache/dubbo-go-pixiu/pkg/filter/event"
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+import (
+       perrors "github.com/pkg/errors"
+)
+
+var (
+       mqClient *Client
+       once     sync.Once
+)
+
+func NewSingletonMQClient(config event.Config) *Client {
+       if mqClient == nil {
+               once.Do(func() {
+                       var err error
+                       mqClient, err = NewMQClient(config)
+                       if err != nil {
+                               logger.Errorf("create mq client failed, %s", 
err.Error())
+                       }
+               })
+       }
+       return mqClient
+}
+
+func NewMQClient(config event.Config) (*Client, error) {
+       var c *Client
+       ctx := context.Background()
+
+       sc := config.ToSaramaConfig()
+       addrs := strings.Split(config.Endpoints, ",")
+       cf, err := NewKafkaConsumerFacade(addrs, sc)
+       if err != nil {
+               return nil, err
+       }
+       pf, err := NewKafkaProviderFacade(addrs, sc)
+       if err != nil {
+               return nil, err
+       }
+
+       switch config.MqType {
+       case constant.MQTypeKafka:
+               c = &Client{
+                       ctx:            ctx,
+                       consumerFacade: cf,
+                       producerFacade: pf,
+               }
+       case constant.MQTypeRocketMQ:
+               return nil, perrors.New("rocketmq not support")
+       }
+
+       return c, nil
+}
+
+type Client struct {
+       ctx            context.Context
+       consumerFacade ConsumerFacade
+       producerFacade ProducerFacade
+}
+
+func (c Client) Apply() error {
+       panic("implement me")
+}
+
+func (c Client) Close() error {
+       c.consumerFacade.Stop()
+       return nil
+}
+
+func (c Client) Call(req *client.Request) (res interface{}, err error) {
+       body, err := ioutil.ReadAll(req.IngressRequest.Body)
+       if err != nil {
+               return nil, err
+       }
+
+       paths := strings.Split(req.API.Path, "/")
+       if len(paths) < 3 {
+               return nil, perrors.New("failed to send message, broker or 
Topic not found")
+       }
+
+       switch event.MQActionStrToInt[paths[0]] {
+       case event.MQActionPublish:
+               var pReq event.MQProduceRequest
+               err = json.Unmarshal(body, &pReq)
+               if err != nil {
+                       return nil, err
+               }
+               err = c.producerFacade.Send(pReq.Msg, WithTopic(pReq.Topic))
+               if err != nil {
+                       return nil, err
+               }
+       case event.MQActionSubscribe:
+               var cReq event.MQConsumeRequest
+               err = json.Unmarshal(body, &cReq)
+               if err != nil {
+                       return nil, err
+               }
+               err = c.consumerFacade.Subscribe(c.ctx, WithTopic(cReq.Topic), 
WithPartition(cReq.Partition),
+                       WithOffset(cReq.Offset), 
WithConsumeUrl(cReq.ConsumeUrl), WithCheckUrl(cReq.CheckUrl))
+               if err != nil {
+                       return nil, err
+               }
+       case event.MQActionUnSubscribe:
+       case event.MQActionConsumeAck:
+       default:
+               return nil, perrors.New("failed to get mq action")
+       }
+
+       return nil, nil
+}
+
+func (c Client) MapParams(req *client.Request) (reqData interface{}, err 
error) {
+       return nil, perrors.New("map params does not support on mq mqClient")
+}
diff --git a/pkg/common/constant/key.go b/pkg/common/constant/key.go
index d51563d..7e17b44 100644
--- a/pkg/common/constant/key.go
+++ b/pkg/common/constant/key.go
@@ -36,6 +36,7 @@ const (
        TracingFilter          = "dgp.filters.tracing"
        HTTPCorsFilter         = "dgp.filter.http.cors"
        HTTPProxyRewriteFilter = "dgp.filter.http.proxyrewrite"
+       HTTPEventFilter        = "dgp.filter.http.event"
 )
 
 const (
@@ -53,6 +54,11 @@ const (
 )
 
 const (
+       MQTypeKafka    = "kafka"
+       MQTypeRocketMQ = "rocketmq"
+)
+
+const (
        ApplicationKey = "application"
        AppVersionKey  = "app.version"
        ClusterKey     = "cluster"
diff --git a/pkg/filter/event/event.go b/pkg/filter/event/event.go
new file mode 100644
index 0000000..56bfb0e
--- /dev/null
+++ b/pkg/filter/event/event.go
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+import (
+       "time"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+       "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
+       "github.com/apache/dubbo-go-pixiu/pkg/context/http"
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+import (
+       "github.com/Shopify/sarama"
+)
+
+const (
+       // Kind is the kind of Fallback.
+       Kind = constant.HTTPEventFilter
+)
+
+func init() {
+       filter.RegisterHttpFilter(&Plugin{})
+}
+
+type (
+       // Plugin is http filter plugin.
+       Plugin struct {
+       }
+
+       // Filter is http filter instance
+       Filter struct {
+               cfg *Config
+       }
+
+       Config struct {
+               ClientId     string        `yaml:"client_id" json:"client_id"`
+               Endpoints    string        `yaml:"endpoints" json:"endpoints"`
+               MqType       MQType        `yaml:"type" json:"type"`
+               KafkaVersion string        `yaml:"kafka_version" 
json:"kafka_version"`
+               Retry        int           `yaml:"retry" json:"retry" 
default:"5"`
+               Timeout      time.Duration `yaml:"timeout" json:"timeout" 
default:"2s"`
+               Offset       string        `yaml:"offset" json:"offset" 
default:"oldest"` // Offset newest or oldest
+       }
+)
+
+func (c *Config) ToSaramaConfig() *sarama.Config {
+       config := sarama.NewConfig()
+
+       version, err := sarama.ParseKafkaVersion(c.KafkaVersion)
+       if err != nil {
+               version = sarama.V2_0_0_0
+               logger.Warnf("kafka version is invalid, use sarama.V2_0_0_0 
instead, err: %s", err.Error())
+       }
+       config.Version = version
+
+       offset := sarama.OffsetNewest
+       switch c.Offset {
+       case "newest":
+               offset = sarama.OffsetNewest
+       case "oldest":
+               offset = sarama.OffsetOldest
+       default:
+               logger.Warn("offset is invalid, use oldest instead")
+       }
+       config.Consumer.Offsets.Initial = offset
+
+       config.ClientID = "pixiu-kafka"
+       if c.ClientId != "" {
+               config.ClientID = c.ClientId
+       }
+       logger.Infof("kafka client id is %s", config.ClientID)
+
+       config.Producer.Retry.Max = c.Retry
+       config.Producer.Timeout = c.Timeout
+
+       return config
+}
+
+func (p *Plugin) Kind() string {
+       return Kind
+}
+
+func (p *Plugin) CreateFilter() (filter.HttpFilter, error) {
+       return &Filter{cfg: &Config{}}, nil
+}
+
+func (f *Filter) PrepareFilterChain(ctx *http.HttpContext) error {
+       ctx.AppendFilterFunc(f.Handle)
+       return nil
+}
+
+func (f *Filter) Handle(ctx *http.HttpContext) {
+       // TODO handle request
+}
+
+func (f *Filter) Apply() error {
+       // TODO init mq client here
+       return nil
+}
+
+func (f *Filter) Config() interface{} {
+       return f.cfg
+}
diff --git a/pkg/filter/event/msg.go b/pkg/filter/event/msg.go
new file mode 100644
index 0000000..27d8bf4
--- /dev/null
+++ b/pkg/filter/event/msg.go
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+// Msq Request Action Type Enum
+
+type MQType string
+
+type MQAction int
+
+const (
+       MQActionPublish = 1 + iota
+       MQActionSubscribe
+       MQActionUnSubscribe
+       MQActionConsumeAck
+)
+
+var MQActionIntToStr = map[MQAction]string{
+       MQActionPublish:     "publish",
+       MQActionSubscribe:   "subscribe",
+       MQActionUnSubscribe: "unsubscribe",
+       MQActionConsumeAck:  "consumeack",
+}
+
+var MQActionStrToInt = map[string]MQAction{
+       "publish":     MQActionPublish,
+       "subscribe":   MQActionSubscribe,
+       "unsubscribe": MQActionUnSubscribe,
+       "consumeack":  MQActionConsumeAck,
+}
+
+// MQConsumeRequest url format http://domain/publish/broker/topic
+type MQConsumeRequest struct {
+       Topic      string `json:"topic"`
+       Partition  int64  `json:"partition"` // for kafka
+       Offset     int64  `json:"offset"`
+       ConsumeUrl string `json:"consume_url"` // not empty when subscribe msg, 
eg: http://10.0.0.1:11451/consume
+       CheckUrl   string `json:"check_url"`   // not empty when subscribe msg, 
eg: http://10.0.0.1:11451/health
+}
+
+type MQProduceRequest struct {
+       Topic string   `json:"topic"`
+       Msg   []string `json:"msg"`
+}
+
+type MQMsgPush struct {
+       Msg []string `json:"msg"`
+}

Reply via email to