This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e1b39d  Add TableView support (#743)
3e1b39d is described below

commit 3e1b39dd6f88aa0a12b949b1d1e064bde6355e22
Author: Ziyao Wei <[email protected]>
AuthorDate: Thu Mar 31 13:42:56 2022 -0400

    Add TableView support (#743)
    
    * Add TableView for Pulsar Golang client
    
    * cckellogg
    
    * Return a copy of the underlying map in tv.Entries()
---
 pulsar/client.go          |   4 +
 pulsar/client_impl.go     |   9 ++
 pulsar/table_view.go      |  78 +++++++++++++
 pulsar/table_view_impl.go | 271 ++++++++++++++++++++++++++++++++++++++++++++++
 pulsar/table_view_test.go |  80 ++++++++++++++
 5 files changed, 442 insertions(+)

diff --git a/pulsar/client.go b/pulsar/client.go
index 8ff152a..f4642c6 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -141,6 +141,10 @@ type Client interface {
        // This method will block until the reader is created successfully.
        CreateReader(ReaderOptions) (Reader, error)
 
+       // CreateTableView creates a table view instance.
+       // This method will block until the table view is created successfully.
+       CreateTableView(TableViewOptions) (TableView, error)
+
        // TopicPartitions Fetches the list of partitions for a given topic
        //
        // If the topic is partitioned, this will return a list of partition 
names.
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 5682927..09b72af 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -176,6 +176,15 @@ func (c *client) CreateReader(options ReaderOptions) 
(Reader, error) {
        return reader, nil
 }
 
+func (c *client) CreateTableView(options TableViewOptions) (TableView, error) {
+       tableView, err := newTableView(c, options)
+       if err != nil {
+               return nil, err
+       }
+       c.handlers.Add(tableView)
+       return tableView, nil
+}
+
 func (c *client) TopicPartitions(topic string) ([]string, error) {
        topicName, err := internal.ParseTopicName(topic)
        if err != nil {
diff --git a/pulsar/table_view.go b/pulsar/table_view.go
new file mode 100644
index 0000000..e566bf0
--- /dev/null
+++ b/pulsar/table_view.go
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "reflect"
+       "time"
+
+       "github.com/apache/pulsar-client-go/pulsar/log"
+)
+
+// TableViewOptions contains the options for creating a TableView
+type TableViewOptions struct {
+       // Topic specifies the topic this table view will subscribe on.
+       // This argument is required when constructing the table view.
+       Topic string
+
+       // Set the interval of updating partitions. Default to 1 minute.
+       AutoUpdatePartitionsInterval time.Duration
+
+       // Schema represents the schema implementation.
+       Schema Schema
+
+       // SchemaValueType represents the type of values for the given schema.
+       SchemaValueType reflect.Type
+
+       // Configure the logger used by the TableView.
+       // By default, a wrapped logrus.StandardLogger will be used, namely,
+       // log.NewLoggerWithLogrus(logrus.StandardLogger())
+       Logger log.Logger
+}
+
+// TableView provides a key-value map view of a compacted topic. Messages 
without keys will be ignored.
+type TableView interface {
+       // Size returns the number of key-value mappings in the TableView.
+       Size() int
+
+       // IsEmpty returns true if this TableView contains no key-value 
mappings.
+       IsEmpty() bool
+
+       // ContainsKey returns true if this TableView contains a mapping for 
the specified key.
+       ContainsKey(key string) bool
+
+       // Get returns the value to which the specified key is mapped, or nil 
if this map contains no mapping for the key.
+       Get(key string) interface{}
+
+       // Entries returns a map view of the mappings contained in this 
TableView.
+       Entries() map[string]interface{}
+
+       // Keys returns a slice of the keys contained in this TableView.
+       Keys() []string
+
+       // ForEach performs the give action for each entry in this map until 
all entries have been processed or the action
+       // returns an error.
+       ForEach(func(string, interface{}) error) error
+
+       // ForEachAndListen performs the give action for each entry in this map 
until all entries have been processed or
+       // the action returns an error.
+       ForEachAndListen(func(string, interface{}) error) error
+
+       // Close closes the table view and releases resources allocated.
+       Close()
+}
diff --git a/pulsar/table_view_impl.go b/pulsar/table_view_impl.go
new file mode 100644
index 0000000..56b9fcb
--- /dev/null
+++ b/pulsar/table_view_impl.go
@@ -0,0 +1,271 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "reflect"
+       "sync"
+       "time"
+
+       "github.com/apache/pulsar-client-go/pulsar/log"
+       "github.com/sirupsen/logrus"
+)
+
+type cancelReader struct {
+       reader     Reader
+       cancelFunc context.CancelFunc
+}
+
+type TableViewImpl struct {
+       client  *client
+       options TableViewOptions
+
+       dataMu sync.Mutex
+       data   map[string]interface{}
+
+       readersMu    sync.Mutex
+       cancelRaders map[string]cancelReader
+
+       listenersMu sync.Mutex
+       listeners   []func(string, interface{}) error
+
+       logger   log.Logger
+       closed   bool
+       closedCh chan struct{}
+}
+
+func newTableView(client *client, options TableViewOptions) (TableView, error) 
{
+       if options.Topic == "" {
+               return nil, newError(TopicNotFound, "topic is required")
+       }
+
+       if options.Schema != nil && options.SchemaValueType == nil {
+               return nil, newError(InvalidConfiguration, "SchemaValueType is 
required when Schema is present")
+       }
+
+       var logger log.Logger
+       if options.Logger != nil {
+               logger = options.Logger
+       } else {
+               logger = log.NewLoggerWithLogrus(logrus.StandardLogger())
+       }
+
+       if options.AutoUpdatePartitionsInterval == 0 {
+               options.AutoUpdatePartitionsInterval = time.Minute
+       }
+
+       tv := TableViewImpl{
+               client:       client,
+               options:      options,
+               data:         make(map[string]interface{}),
+               cancelRaders: make(map[string]cancelReader),
+               logger:       logger,
+               closedCh:     make(chan struct{}),
+       }
+
+       // Do an initial round of partition update check to make sure we can 
populate the partition readers
+       if err := tv.partitionUpdateCheck(); err != nil {
+               return nil, err
+       }
+       go tv.periodicPartitionUpdateCheck()
+
+       return &tv, nil
+}
+
+func (tv *TableViewImpl) partitionUpdateCheck() error {
+       partitionsArray, err := tv.client.TopicPartitions(tv.options.Topic)
+       if err != nil {
+               return fmt.Errorf("tv.client.TopicPartitions(%s) failed: %w", 
tv.options.Topic, err)
+       }
+
+       partitions := make(map[string]bool, len(partitionsArray))
+       for _, partition := range partitionsArray {
+               partitions[partition] = true
+       }
+
+       tv.readersMu.Lock()
+       defer tv.readersMu.Unlock()
+
+       for partition, cancelReader := range tv.cancelRaders {
+               if _, ok := partitions[partition]; !ok {
+                       cancelReader.cancelFunc()
+                       cancelReader.reader.Close()
+                       delete(tv.cancelRaders, partition)
+               }
+       }
+
+       for partition := range partitions {
+               if _, ok := tv.cancelRaders[partition]; !ok {
+                       reader, err := newReader(tv.client, ReaderOptions{
+                               Topic:          partition,
+                               StartMessageID: EarliestMessageID(),
+                               ReadCompacted:  true,
+                               // TODO: Pooling?
+                               Schema: tv.options.Schema,
+                       })
+                       if err != nil {
+                               return fmt.Errorf("create new reader failed for 
%s: %w", partition, err)
+                       }
+                       for reader.HasNext() {
+                               msg, err := reader.Next(context.Background())
+                               if err != nil {
+                                       tv.logger.Errorf("read next message 
failed for %s: %w", partition, err)
+                               }
+                               tv.handleMessage(msg)
+                       }
+                       ctx, cancelFunc := 
context.WithCancel(context.Background())
+                       tv.cancelRaders[partition] = cancelReader{
+                               reader:     reader,
+                               cancelFunc: cancelFunc,
+                       }
+                       go tv.watchReaderForNewMessages(ctx, reader)
+               }
+       }
+
+       return nil
+}
+
+func (tv *TableViewImpl) periodicPartitionUpdateCheck() {
+       for {
+               if err := tv.partitionUpdateCheck(); err != nil {
+                       tv.logger.Errorf("failed to check for changes in number 
of partitions: %w", err)
+               }
+               select {
+               case <-tv.closedCh:
+                       // If the TableViewImpl has been closed, stop checking 
for partition updates
+                       return
+               case <-time.After(tv.options.AutoUpdatePartitionsInterval):
+                       continue
+               }
+       }
+}
+
+func (tv *TableViewImpl) Size() int {
+       tv.dataMu.Lock()
+       defer tv.dataMu.Unlock()
+       return len(tv.data)
+}
+
+func (tv *TableViewImpl) IsEmpty() bool {
+       tv.dataMu.Lock()
+       defer tv.dataMu.Unlock()
+       return tv.Size() == 0
+}
+
+func (tv *TableViewImpl) ContainsKey(key string) bool {
+       tv.dataMu.Lock()
+       defer tv.dataMu.Unlock()
+       _, ok := tv.data[key]
+       return ok
+}
+
+func (tv *TableViewImpl) Get(key string) interface{} {
+       tv.dataMu.Lock()
+       defer tv.dataMu.Unlock()
+       return tv.data[key]
+}
+
+func (tv *TableViewImpl) Entries() map[string]interface{} {
+       tv.dataMu.Lock()
+       defer tv.dataMu.Unlock()
+       data := make(map[string]interface{}, len(tv.data))
+       for k, v := range tv.data {
+               data[k] = v
+       }
+       return tv.data
+}
+
+func (tv *TableViewImpl) Keys() []string {
+       tv.dataMu.Lock()
+       defer tv.dataMu.Unlock()
+       keys := make([]string, len(tv.data))
+       i := 0
+       for k := range tv.data {
+               keys[i] = k
+               i++
+       }
+       return keys
+}
+
+func (tv *TableViewImpl) ForEach(action func(string, interface{}) error) error 
{
+       tv.dataMu.Lock()
+       defer tv.dataMu.Unlock()
+       for k, v := range tv.data {
+               if err := action(k, v); err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+func (tv *TableViewImpl) ForEachAndListen(action func(string, interface{}) 
error) error {
+       tv.listenersMu.Lock()
+       defer tv.listenersMu.Unlock()
+
+       if err := tv.ForEach(action); err != nil {
+               return err
+       }
+
+       tv.listeners = append(tv.listeners, action)
+       return nil
+}
+
+func (tv *TableViewImpl) Close() {
+       tv.readersMu.Lock()
+       defer tv.readersMu.Unlock()
+
+       if !tv.closed {
+               tv.closed = true
+               for _, cancelReader := range tv.cancelRaders {
+                       cancelReader.reader.Close()
+               }
+               close(tv.closedCh)
+       }
+}
+
+func (tv *TableViewImpl) handleMessage(msg Message) {
+       tv.dataMu.Lock()
+       defer tv.dataMu.Unlock()
+
+       payload := 
reflect.Indirect(reflect.New(tv.options.SchemaValueType)).Interface()
+       if err := msg.GetSchemaValue(&payload); err != nil {
+               tv.logger.Errorf("msg.GetSchemaValue() failed with %w; msg is 
%v", msg, err)
+       }
+       tv.data[msg.Key()] = payload
+       for _, listener := range tv.listeners {
+               if err := listener(msg.Key(), payload); err != nil {
+                       tv.logger.Errorf("table view listener failed for %v: 
%w", msg, err)
+               }
+       }
+}
+
+func (tv *TableViewImpl) watchReaderForNewMessages(ctx context.Context, reader 
Reader) {
+       for {
+               msg, err := reader.Next(ctx)
+               if err != nil {
+                       tv.logger.Errorf("read next message failed for %s: %w", 
reader.Topic(), err)
+               }
+               if errors.Is(err, context.Canceled) {
+                       return
+               }
+               tv.handleMessage(msg)
+       }
+}
diff --git a/pulsar/table_view_test.go b/pulsar/table_view_test.go
new file mode 100644
index 0000000..829187b
--- /dev/null
+++ b/pulsar/table_view_test.go
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "context"
+       "fmt"
+       "reflect"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestTableView(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.NoError(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       schema := NewStringSchema(nil)
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:  topic,
+               Schema: schema,
+       })
+       assert.NoError(t, err)
+       defer producer.Close()
+
+       numMsg := 10
+       valuePrefix := "hello pulsar: "
+       for i := 0; i < numMsg; i++ {
+               key := fmt.Sprintf("%d", i)
+               t.Log(key)
+               _, err = producer.Send(context.Background(), &ProducerMessage{
+                       Key:   key,
+                       Value: fmt.Sprintf(valuePrefix + key),
+               })
+               assert.NoError(t, err)
+       }
+
+       // create table view
+       v := ""
+       tv, err := client.CreateTableView(TableViewOptions{
+               Topic:           topic,
+               Schema:          schema,
+               SchemaValueType: reflect.TypeOf(&v),
+       })
+       assert.NoError(t, err)
+       defer tv.Close()
+
+       // Wait until tv receives all messages
+       for tv.Size() < 10 {
+               time.Sleep(time.Second * 1)
+               t.Logf("TableView number of elements: %d", tv.Size())
+       }
+
+       for k, v := range tv.Entries() {
+               assert.Equal(t, valuePrefix+k, *(v.(*string)))
+       }
+}

Reply via email to