This is an automated email from the ASF dual-hosted git repository.
ztelur pushed a commit to branch eventmesh
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git
The following commit(s) were added to refs/heads/eventmesh by this push:
new 3b37a3c Feat: event mesh (#286)
3b37a3c is described below
commit 3b37a3c116cffc2aba6699cd526a56e1ee2c2429
Author: MasterKenway <[email protected]>
AuthorDate: Sat Oct 23 22:36:17 2021 +0800
Feat: event mesh (#286)
* feat: mq facade definition
* feat: mq facade definition (kafka impl)
* feat: mq type definition
* feat: msg definition
* feat: mq facade definition (kafka impl)
* feat: init mq filter
* feat: init mq client
* feat: make params config with Options implement with builder mode
* fix: fit dynamic options
* feat: add producer facade
* docs: add lic header
* feat: call facade
* fix: resolve import cycle
* fix: use chan as stop signal
* fix: go lint
---
go.mod | 1 +
go.sum | 6 ++
pkg/client/mq/facade.go | 96 +++++++++++++++++++
pkg/client/mq/kafka_facade.go | 211 ++++++++++++++++++++++++++++++++++++++++++
pkg/client/mq/mq.go | 145 +++++++++++++++++++++++++++++
pkg/common/constant/key.go | 6 ++
pkg/filter/event/event.go | 122 ++++++++++++++++++++++++
pkg/filter/event/msg.go | 63 +++++++++++++
8 files changed, 650 insertions(+)
diff --git a/go.mod b/go.mod
index b227777..6443cfd 100644
--- a/go.mod
+++ b/go.mod
@@ -3,6 +3,7 @@ module github.com/apache/dubbo-go-pixiu
go 1.14
require (
+ github.com/Shopify/sarama v1.19.0
github.com/alibaba/sentinel-golang v1.0.2
github.com/apache/dubbo-go v1.5.7
github.com/apache/dubbo-go-hessian2 v1.9.3
diff --git a/go.sum b/go.sum
index 81bc996..8eb4eea 100644
--- a/go.sum
+++ b/go.sum
@@ -65,6 +65,7 @@ github.com/PuerkitoBio/purell v1.0.0/go.mod
h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbt
github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod
h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/RoaringBitmap/roaring v0.6.1
h1:O36Tdaj1Fi/zyr25shTHwlQPGdq53+u4WkM08AOEjiE=
github.com/RoaringBitmap/roaring v0.6.1/go.mod
h1:WZ83fjBF/7uBHi6QoFyfGL4+xuV4Qn+xFkm4+vSzrhE=
+github.com/Shopify/sarama v1.19.0
h1:9oksLxC6uxVPHPVYUmq6xhr1BOF/hHobWH2UzO67z1s=
github.com/Shopify/sarama v1.19.0/go.mod
h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod
h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod
h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
@@ -195,8 +196,11 @@ github.com/dubbogo/jsonparser v1.0.1/go.mod
h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWb
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod
h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0
h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod
h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
+github.com/eapache/go-resiliency v1.1.0
h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU=
github.com/eapache/go-resiliency v1.1.0/go.mod
h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
+github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21
h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod
h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
+github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod
h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/edsrzf/mmap-go v1.0.0/go.mod
h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/elazarl/go-bindata-assetfs
v0.0.0-20160803192304-e1a2a7ec64b0/go.mod
h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4=
@@ -616,6 +620,7 @@ github.com/performancecopilot/speed
v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod
h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/philhofer/fwd v1.0.0/go.mod
h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod
h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
+github.com/pierrec/lz4 v2.0.5+incompatible
h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod
h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@@ -669,6 +674,7 @@ github.com/prometheus/procfs v0.6.0
h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3x
github.com/prometheus/procfs v0.6.0/go.mod
h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/tsdb v0.7.1/go.mod
h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rboyer/safeio v0.2.1/go.mod
h1:Cq/cEPK+YXFn622lsQ0K4KsPZSPtaptHHEldsy7Fmig=
+github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a
h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod
h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/renier/xmlrpc v0.0.0-20170708154548-ce4a1a486c03/go.mod
h1:gRAiPF5C5Nd0eyyRdqIu9qTiFSoZzpTq727b5B8fkkU=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod
h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
diff --git a/pkg/client/mq/facade.go b/pkg/client/mq/facade.go
new file mode 100644
index 0000000..e177c3c
--- /dev/null
+++ b/pkg/client/mq/facade.go
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mq
+
+import (
+ "context"
+ "strconv"
+ "strings"
+)
+
+type ConsumerFacade interface {
+ // Subscribe message with specified broker and Topic, then handle msg
with handler which send msg to real consumers
+ Subscribe(ctx context.Context, option ...Option) error
+ UnSubscribe(opts ...Option) error
+ Stop()
+}
+
+func GetConsumerManagerKey(topic string, partition int32) string {
+ return strings.Join([]string{topic, strconv.Itoa(int(partition))}, ".")
+}
+
+// MQOptions Consumer options
+// TODO: Add rocketmq params
+type MQOptions struct {
+ Topic string
+ Partition int64
+ ConsumeUrl string
+ CheckUrl string
+ Offset int64
+}
+
+func (o *MQOptions) ApplyOpts(opts ...Option) {
+ for _, opt := range opts {
+ opt(o)
+ }
+}
+
+func DefaultOptions() *MQOptions {
+ return &MQOptions{
+ Topic: "dubbo-go-pixiu-default-topic",
+ Partition: 1,
+ Offset: -2,
+ }
+}
+
+type Option func(o *MQOptions)
+
+func WithTopic(t string) Option {
+ return func(o *MQOptions) {
+ o.Topic = t
+ }
+}
+
+func WithPartition(p int64) Option {
+ return func(o *MQOptions) {
+ o.Partition = p
+ }
+}
+
+func WithConsumeUrl(ch string) Option {
+ return func(o *MQOptions) {
+ o.ConsumeUrl = ch
+ }
+}
+
+func WithCheckUrl(ck string) Option {
+ return func(o *MQOptions) {
+ o.CheckUrl = ck
+ }
+}
+
+func WithOffset(offset int64) Option {
+ return func(o *MQOptions) {
+ o.Offset = offset
+ }
+}
+
+type ProducerFacade interface {
+ // Send msg to specified broker and Topic
+ Send(msgs []string, opts ...Option) error
+}
diff --git a/pkg/client/mq/kafka_facade.go b/pkg/client/mq/kafka_facade.go
new file mode 100644
index 0000000..f2abc21
--- /dev/null
+++ b/pkg/client/mq/kafka_facade.go
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mq
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "net/http"
+ "strconv"
+ "sync"
+ "time"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/filter/event"
+ "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+import (
+ "github.com/Shopify/sarama"
+ perrors "github.com/pkg/errors"
+)
+
+func NewKafkaConsumerFacade(addrs []string, config *sarama.Config)
(*KafkaConsumerFacade, error) {
+ consumer, err := sarama.NewConsumer(addrs, config)
+ if err != nil {
+ return nil, err
+ }
+
+ // does not set up cookiejar may cause some problem
+ return &KafkaConsumerFacade{consumer: consumer, httpClient:
&http.Client{Timeout: 5 * time.Second}, done: make(chan struct{})}, nil
+}
+
+type KafkaConsumerFacade struct {
+ consumer sarama.Consumer
+ consumerManager map[string]func()
+ rwLock sync.RWMutex
+ httpClient *http.Client
+ wg sync.WaitGroup
+ done chan struct{}
+}
+
+func (f *KafkaConsumerFacade) Subscribe(ctx context.Context, opts ...Option)
error {
+ cOpt := DefaultOptions()
+ cOpt.ApplyOpts(opts...)
+ partConsumer, err := f.consumer.ConsumePartition(cOpt.Topic,
int32(cOpt.Partition), sarama.OffsetOldest)
+ if err != nil {
+ return err
+ }
+ c, cancel := context.WithCancel(ctx)
+ key := GetConsumerManagerKey(cOpt.Topic, int32(cOpt.Partition))
+ f.rwLock.Lock()
+ defer f.rwLock.Unlock()
+ f.consumerManager[key] = cancel
+ f.wg.Add(2)
+ go f.ConsumePartitions(c, partConsumer, cOpt.ConsumeUrl)
+ go f.checkConsumerIsAlive(c, key, cOpt.CheckUrl)
+ return nil
+}
+
+// ConsumePartitions consume function
+func (f *KafkaConsumerFacade) ConsumePartitions(ctx context.Context,
partConsumer sarama.PartitionConsumer, consumeUrl string) {
+ defer f.wg.Done()
+
+ for {
+ select {
+ case <-f.done:
+ logger.Info()
+ case msg := <-partConsumer.Messages():
+ data, err := json.Marshal(event.MQMsgPush{Msg:
[]string{string(msg.Value)}})
+ if err != nil {
+ logger.Warn()
+ continue
+ }
+
+ req, err := http.NewRequest(http.MethodPost,
consumeUrl, bytes.NewReader(data))
+ if err != nil {
+ logger.Warn()
+ continue
+ }
+
+ for i := 0; i < 5; i++ {
+ err := func() error {
+ resp, err := f.httpClient.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode == http.StatusOK {
+ return nil
+ }
+ return perrors.New("failed send msg to
consumer with status code " + strconv.Itoa(resp.StatusCode))
+ }()
+ if err != nil {
+ logger.Warn(err.Error())
+ time.Sleep(10 * time.Millisecond)
+ } else {
+ break
+ }
+ }
+ }
+ }
+}
+
+// checkConsumerIsAlive make sure consumer is alive or would be removed from
consumer list
+func (f *KafkaConsumerFacade) checkConsumerIsAlive(ctx context.Context, key
string, checkUrl string) {
+ defer f.wg.Done()
+
+ ticker := time.NewTicker(15 * time.Second)
+ for {
+ select {
+ case <-f.done:
+ logger.Info()
+ case <-ticker.C:
+ lastCheck := 0
+ for i := 0; i < 5; i++ {
+ err := func() error {
+ req, err :=
http.NewRequest(http.MethodGet, checkUrl, bytes.NewReader([]byte{}))
+ if err != nil {
+ logger.Warn()
+ }
+
+ resp, err := f.httpClient.Do(req)
+ if err != nil {
+ logger.Warn()
+ }
+ defer resp.Body.Close()
+
+ lastCheck = resp.StatusCode
+ if resp.StatusCode != http.StatusOK {
+ return perrors.New("failed
check consumer alive or not with status code " + strconv.Itoa(resp.StatusCode))
+ }
+
+ logger.Warn()
+ return nil
+ }()
+ if err != nil {
+ logger.Warn()
+ time.Sleep(10 * time.Millisecond)
+ } else {
+ break
+ }
+ }
+
+ if lastCheck != http.StatusOK {
+ f.consumerManager[key]()
+ delete(f.consumerManager, key)
+ }
+
+ }
+ }
+}
+
+func (f *KafkaConsumerFacade) UnSubscribe(opts ...Option) error {
+ cOpt := DefaultOptions()
+ cOpt.ApplyOpts(opts...)
+ key := GetConsumerManagerKey(cOpt.Topic, int32(cOpt.Partition))
+ if cancel, ok := f.consumerManager[key]; !ok {
+ return perrors.New("consumer goroutine not found")
+ } else {
+ cancel()
+ delete(f.consumerManager, key)
+ }
+ return nil
+}
+
+func (f *KafkaConsumerFacade) Stop() {
+ close(f.done)
+ f.wg.Wait()
+}
+
+func NewKafkaProviderFacade(addrs []string, config *sarama.Config)
(*KafkaProducerFacade, error) {
+ producer, err := sarama.NewSyncProducer(addrs, config)
+ if err != nil {
+ return nil, err
+ }
+ return &KafkaProducerFacade{producer: producer}, nil
+}
+
+type KafkaProducerFacade struct {
+ producer sarama.SyncProducer
+}
+
+func (k *KafkaProducerFacade) Send(msgs []string, opts ...Option) error {
+ pOpt := DefaultOptions()
+ pOpt.ApplyOpts(opts...)
+
+ pMsgs := make([]*sarama.ProducerMessage, 0)
+ for _, msg := range msgs {
+ pMsgs = append(pMsgs, &sarama.ProducerMessage{Topic:
pOpt.Topic, Value: sarama.StringEncoder(msg)})
+ }
+
+ return k.producer.SendMessages(pMsgs)
+}
diff --git a/pkg/client/mq/mq.go b/pkg/client/mq/mq.go
new file mode 100644
index 0000000..d05e51f
--- /dev/null
+++ b/pkg/client/mq/mq.go
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mq
+
+import (
+ "context"
+ "encoding/json"
+ "io/ioutil"
+ "strings"
+ "sync"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/client"
+ "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+ "github.com/apache/dubbo-go-pixiu/pkg/filter/event"
+ "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+import (
+ perrors "github.com/pkg/errors"
+)
+
+var (
+ mqClient *Client
+ once sync.Once
+)
+
+func NewSingletonMQClient(config event.Config) *Client {
+ if mqClient == nil {
+ once.Do(func() {
+ var err error
+ mqClient, err = NewMQClient(config)
+ if err != nil {
+ logger.Errorf("create mq client failed, %s",
err.Error())
+ }
+ })
+ }
+ return mqClient
+}
+
+func NewMQClient(config event.Config) (*Client, error) {
+ var c *Client
+ ctx := context.Background()
+
+ sc := config.ToSaramaConfig()
+ addrs := strings.Split(config.Endpoints, ",")
+ cf, err := NewKafkaConsumerFacade(addrs, sc)
+ if err != nil {
+ return nil, err
+ }
+ pf, err := NewKafkaProviderFacade(addrs, sc)
+ if err != nil {
+ return nil, err
+ }
+
+ switch config.MqType {
+ case constant.MQTypeKafka:
+ c = &Client{
+ ctx: ctx,
+ consumerFacade: cf,
+ producerFacade: pf,
+ }
+ case constant.MQTypeRocketMQ:
+ return nil, perrors.New("rocketmq not support")
+ }
+
+ return c, nil
+}
+
+type Client struct {
+ ctx context.Context
+ consumerFacade ConsumerFacade
+ producerFacade ProducerFacade
+}
+
+func (c Client) Apply() error {
+ panic("implement me")
+}
+
+func (c Client) Close() error {
+ c.consumerFacade.Stop()
+ return nil
+}
+
+func (c Client) Call(req *client.Request) (res interface{}, err error) {
+ body, err := ioutil.ReadAll(req.IngressRequest.Body)
+ if err != nil {
+ return nil, err
+ }
+
+ paths := strings.Split(req.API.Path, "/")
+ if len(paths) < 3 {
+ return nil, perrors.New("failed to send message, broker or
Topic not found")
+ }
+
+ switch event.MQActionStrToInt[paths[0]] {
+ case event.MQActionPublish:
+ var pReq event.MQProduceRequest
+ err = json.Unmarshal(body, &pReq)
+ if err != nil {
+ return nil, err
+ }
+ err = c.producerFacade.Send(pReq.Msg, WithTopic(pReq.Topic))
+ if err != nil {
+ return nil, err
+ }
+ case event.MQActionSubscribe:
+ var cReq event.MQConsumeRequest
+ err = json.Unmarshal(body, &cReq)
+ if err != nil {
+ return nil, err
+ }
+ err = c.consumerFacade.Subscribe(c.ctx, WithTopic(cReq.Topic),
WithPartition(cReq.Partition),
+ WithOffset(cReq.Offset),
WithConsumeUrl(cReq.ConsumeUrl), WithCheckUrl(cReq.CheckUrl))
+ if err != nil {
+ return nil, err
+ }
+ case event.MQActionUnSubscribe:
+ case event.MQActionConsumeAck:
+ default:
+ return nil, perrors.New("failed to get mq action")
+ }
+
+ return nil, nil
+}
+
+func (c Client) MapParams(req *client.Request) (reqData interface{}, err
error) {
+ return nil, perrors.New("map params does not support on mq mqClient")
+}
diff --git a/pkg/common/constant/key.go b/pkg/common/constant/key.go
index d51563d..7e17b44 100644
--- a/pkg/common/constant/key.go
+++ b/pkg/common/constant/key.go
@@ -36,6 +36,7 @@ const (
TracingFilter = "dgp.filters.tracing"
HTTPCorsFilter = "dgp.filter.http.cors"
HTTPProxyRewriteFilter = "dgp.filter.http.proxyrewrite"
+ HTTPEventFilter = "dgp.filter.http.event"
)
const (
@@ -53,6 +54,11 @@ const (
)
const (
+ MQTypeKafka = "kafka"
+ MQTypeRocketMQ = "rocketmq"
+)
+
+const (
ApplicationKey = "application"
AppVersionKey = "app.version"
ClusterKey = "cluster"
diff --git a/pkg/filter/event/event.go b/pkg/filter/event/event.go
new file mode 100644
index 0000000..56bfb0e
--- /dev/null
+++ b/pkg/filter/event/event.go
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+import (
+ "time"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+ "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
+ "github.com/apache/dubbo-go-pixiu/pkg/context/http"
+ "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+import (
+ "github.com/Shopify/sarama"
+)
+
+const (
+ // Kind is the kind of Fallback.
+ Kind = constant.HTTPEventFilter
+)
+
+func init() {
+ filter.RegisterHttpFilter(&Plugin{})
+}
+
+type (
+ // Plugin is http filter plugin.
+ Plugin struct {
+ }
+
+ // Filter is http filter instance
+ Filter struct {
+ cfg *Config
+ }
+
+ Config struct {
+ ClientId string `yaml:"client_id" json:"client_id"`
+ Endpoints string `yaml:"endpoints" json:"endpoints"`
+ MqType MQType `yaml:"type" json:"type"`
+ KafkaVersion string `yaml:"kafka_version"
json:"kafka_version"`
+ Retry int `yaml:"retry" json:"retry"
default:"5"`
+ Timeout time.Duration `yaml:"timeout" json:"timeout"
default:"2s"`
+ Offset string `yaml:"offset" json:"offset"
default:"oldest"` // Offset newest or oldest
+ }
+)
+
+func (c *Config) ToSaramaConfig() *sarama.Config {
+ config := sarama.NewConfig()
+
+ version, err := sarama.ParseKafkaVersion(c.KafkaVersion)
+ if err != nil {
+ version = sarama.V2_0_0_0
+ logger.Warnf("kafka version is invalid, use sarama.V2_0_0_0
instead, err: %s", err.Error())
+ }
+ config.Version = version
+
+ offset := sarama.OffsetNewest
+ switch c.Offset {
+ case "newest":
+ offset = sarama.OffsetNewest
+ case "oldest":
+ offset = sarama.OffsetOldest
+ default:
+ logger.Warn("offset is invalid, use oldest instead")
+ }
+ config.Consumer.Offsets.Initial = offset
+
+ config.ClientID = "pixiu-kafka"
+ if c.ClientId != "" {
+ config.ClientID = c.ClientId
+ }
+ logger.Infof("kafka client id is %s", config.ClientID)
+
+ config.Producer.Retry.Max = c.Retry
+ config.Producer.Timeout = c.Timeout
+
+ return config
+}
+
+func (p *Plugin) Kind() string {
+ return Kind
+}
+
+func (p *Plugin) CreateFilter() (filter.HttpFilter, error) {
+ return &Filter{cfg: &Config{}}, nil
+}
+
+func (f *Filter) PrepareFilterChain(ctx *http.HttpContext) error {
+ ctx.AppendFilterFunc(f.Handle)
+ return nil
+}
+
+func (f *Filter) Handle(ctx *http.HttpContext) {
+ // TODO handle request
+}
+
+func (f *Filter) Apply() error {
+ // TODO init mq client here
+ return nil
+}
+
+func (f *Filter) Config() interface{} {
+ return f.cfg
+}
diff --git a/pkg/filter/event/msg.go b/pkg/filter/event/msg.go
new file mode 100644
index 0000000..27d8bf4
--- /dev/null
+++ b/pkg/filter/event/msg.go
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+// Msq Request Action Type Enum
+
+type MQType string
+
+type MQAction int
+
+const (
+ MQActionPublish = 1 + iota
+ MQActionSubscribe
+ MQActionUnSubscribe
+ MQActionConsumeAck
+)
+
+var MQActionIntToStr = map[MQAction]string{
+ MQActionPublish: "publish",
+ MQActionSubscribe: "subscribe",
+ MQActionUnSubscribe: "unsubscribe",
+ MQActionConsumeAck: "consumeack",
+}
+
+var MQActionStrToInt = map[string]MQAction{
+ "publish": MQActionPublish,
+ "subscribe": MQActionSubscribe,
+ "unsubscribe": MQActionUnSubscribe,
+ "consumeack": MQActionConsumeAck,
+}
+
+// MQConsumeRequest url format http://domain/publish/broker/topic
+type MQConsumeRequest struct {
+ Topic string `json:"topic"`
+ Partition int64 `json:"partition"` // for kafka
+ Offset int64 `json:"offset"`
+ ConsumeUrl string `json:"consume_url"` // not empty when subscribe msg,
eg: http://10.0.0.1:11451/consume
+ CheckUrl string `json:"check_url"` // not empty when subscribe msg,
eg: http://10.0.0.1:11451/health
+}
+
+type MQProduceRequest struct {
+ Topic string `json:"topic"`
+ Msg []string `json:"msg"`
+}
+
+type MQMsgPush struct {
+ Msg []string `json:"msg"`
+}