cckellogg commented on a change in pull request #743:
URL: https://github.com/apache/pulsar-client-go/pull/743#discussion_r828376524



##########
File path: pulsar/table_view.go
##########
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "reflect"
+       "time"
+
+       "github.com/apache/pulsar-client-go/pulsar/log"
+)
+
+// TableViewOptions contains the options for creating a TableView
+type TableViewOptions struct {
+       // Topic specifies the topic this table view will subscribe on.
+       // This argument is required when constructing the table view.
+       Topic string
+
+       // Set the interval of updating partitions. Default to 1 minute.
+       AutoUpdatePartitionsInterval time.Duration
+
+       // Schema represents the schema implementation.
+       Schema Schema
+
+       // SchemaValueType represents the type of values for the given schema.
+       SchemaValueType reflect.Type

Review comment:
       Curious how does is the Schema related to the SchemaValueType?

##########
File path: pulsar/table_view_impl.go
##########
@@ -0,0 +1,261 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "reflect"
+       "sync"
+       "time"
+
+       "github.com/apache/pulsar-client-go/pulsar/log"
+       "github.com/sirupsen/logrus"
+)
+
+type TableViewImpl struct {
+       client  *client
+       options TableViewOptions
+
+       dataMu sync.Mutex
+       data   map[string]interface{}
+
+       readersMu        sync.Mutex
+       readers          map[string]Reader

Review comment:
       Nitpick should we create a wrapper struct so we don't need to maintain 
two map?
   Maybe something like this?
   ```
   type cancelReader struct {
     reader Reader
     cancelFunc context.CancelFunc
   }
   ```

##########
File path: pulsar/table_view_impl.go
##########
@@ -0,0 +1,261 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "reflect"
+       "sync"
+       "time"
+
+       "github.com/apache/pulsar-client-go/pulsar/log"
+       "github.com/sirupsen/logrus"
+)
+
+type TableViewImpl struct {
+       client  *client
+       options TableViewOptions
+
+       dataMu sync.Mutex
+       data   map[string]interface{}
+
+       readersMu        sync.Mutex
+       readers          map[string]Reader
+       readerCancelFunc map[string]context.CancelFunc
+
+       listenersMu sync.Mutex
+       listeners   []func(string, interface{}) error
+
+       logger   log.Logger
+       closed   bool
+       closedCh chan struct{}
+}
+
+func newTableView(client *client, options TableViewOptions) (TableView, error) 
{
+       if options.Topic == "" {
+               return nil, newError(TopicNotFound, "topic is required")
+       }
+
+       if options.Schema != nil && options.SchemaValueType == nil {
+               return nil, newError(InvalidConfiguration, "SchemaValueType is 
required when Schema is present")
+       }
+
+       var logger log.Logger
+       if options.Logger != nil {
+               logger = options.Logger
+       } else {
+               logger = log.NewLoggerWithLogrus(logrus.StandardLogger())
+       }
+
+       if options.AutoUpdatePartitionsInterval == 0 {
+               options.AutoUpdatePartitionsInterval = time.Minute
+       }
+
+       tv := TableViewImpl{
+               client:           client,
+               options:          options,
+               data:             make(map[string]interface{}),
+               readers:          make(map[string]Reader),
+               readerCancelFunc: make(map[string]context.CancelFunc),
+               logger:           logger,
+               closedCh:         make(chan struct{}),
+       }
+
+       go tv.periodicPartitionUpdateCheck()
+
+       return &tv, nil
+}
+
+func (tv *TableViewImpl) periodicPartitionUpdateCheck() {
+       check := func() error {
+               partitionsArray, err := 
tv.client.TopicPartitions(tv.options.Topic)
+               if err != nil {
+                       return fmt.Errorf("tv.client.TopicPartitions(%s) 
failed: %w", tv.options.Topic, err)
+               }
+
+               partitions := make(map[string]bool, len(partitionsArray))
+               for _, partition := range partitionsArray {
+                       partitions[partition] = true
+               }
+
+               tv.readersMu.Lock()
+               defer tv.readersMu.Unlock()
+
+               for partition := range partitions {
+                       if _, ok := tv.readers[partition]; !ok {
+                               reader, err := newReader(tv.client, 
ReaderOptions{
+                                       Topic:          partition,
+                                       StartMessageID: EarliestMessageID(),
+                                       ReadCompacted:  true,
+                                       // TODO: Pooling?
+                                       Schema: tv.options.Schema,
+                               })
+                               if err != nil {
+                                       return fmt.Errorf("create new reader 
failed for %s: %w", partition, err)
+                               }
+                               for reader.HasNext() {
+                                       msg, err := 
reader.Next(context.Background())
+                                       if err != nil {
+                                               tv.logger.Errorf("read next 
message failed for %s: %w", partition, err)
+                                       }
+                                       tv.handleMessage(msg)
+                               }
+                               tv.readers[partition] = reader
+                               ctx, cancelFunc := 
context.WithCancel(context.Background())
+                               tv.readerCancelFunc[partition] = cancelFunc
+                               go tv.watchReaderForNewMessages(ctx, reader)
+                       }
+               }
+
+               for partition, reader := range tv.readers {
+                       if _, ok := partitions[partition]; !ok {
+                               tv.readerCancelFunc[partition]()
+                               delete(tv.readerCancelFunc, partition)
+
+                               reader.Close()
+                               delete(tv.readers, partition)
+                       }
+               }
+               return nil
+       }
+
+       for {
+               if err := check(); err != nil {
+                       tv.logger.Errorf("failed to check for changes in number 
of partitions: %w", err)
+               }
+               select {
+               case <-tv.closedCh:
+                       // If the TableViewImpl has been closed, stop checking 
for partition updates
+                       return
+               case <-time.After(tv.options.AutoUpdatePartitionsInterval):
+                       continue
+               }
+       }
+}
+
+func (tv *TableViewImpl) Size() int {
+       tv.dataMu.Lock()
+       defer tv.dataMu.Unlock()
+       return len(tv.data)
+}
+
+func (tv *TableViewImpl) IsEmpty() bool {
+       tv.dataMu.Lock()
+       defer tv.dataMu.Unlock()
+       return tv.Size() == 0
+}
+
+func (tv *TableViewImpl) ContainsKey(key string) bool {
+       tv.dataMu.Lock()
+       defer tv.dataMu.Unlock()
+       _, ok := tv.data[key]
+       return ok
+}
+
+func (tv *TableViewImpl) Get(key string) interface{} {
+       tv.dataMu.Lock()
+       defer tv.dataMu.Unlock()
+       return tv.data[key]
+}
+
+func (tv *TableViewImpl) Entries() map[string]interface{} {
+       tv.dataMu.Lock()
+       defer tv.dataMu.Unlock()
+       return tv.data
+}
+
+func (tv *TableViewImpl) Keys() []string {
+       tv.dataMu.Lock()
+       defer tv.dataMu.Unlock()
+       keys := make([]string, len(tv.data))
+       i := 0
+       for k := range tv.data {
+               keys[i] = k
+               i++
+       }
+       return keys
+}
+
+func (tv *TableViewImpl) ForEach(action func(string, interface{}) error) error 
{
+       tv.dataMu.Lock()
+       defer tv.dataMu.Unlock()
+       for k, v := range tv.data {
+               if err := action(k, v); err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+func (tv *TableViewImpl) ForEachAndListen(action func(string, interface{}) 
error) error {
+       tv.listenersMu.Lock()
+       defer tv.listenersMu.Unlock()
+
+       if err := tv.ForEach(action); err != nil {
+               return err
+       }
+
+       tv.listeners = append(tv.listeners, action)
+       return nil
+}
+
+func (tv *TableViewImpl) Close() {
+       tv.readersMu.Lock()
+       defer tv.readersMu.Unlock()
+
+       if !tv.closed {
+               tv.closed = true
+               for _, reader := range tv.readers {
+                       reader.Close()
+               }
+               close(tv.closedCh)
+       }
+}
+
+func (tv *TableViewImpl) handleMessage(msg Message) {
+       tv.dataMu.Lock()
+       defer tv.dataMu.Unlock()
+
+       payload := 
reflect.Indirect(reflect.New(tv.options.SchemaValueType)).Interface()
+       tv.logger.Errorf("payload: %v %T", payload, payload)

Review comment:
       Should this be debug? Same for line 241?

##########
File path: pulsar/table_view.go
##########
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "reflect"
+       "time"
+
+       "github.com/apache/pulsar-client-go/pulsar/log"
+)
+
+// TableViewOptions contains the options for creating a TableView
+type TableViewOptions struct {
+       // Topic specifies the topic this table view will subscribe on.
+       // This argument is required when constructing the table view.
+       Topic string
+
+       // Set the interval of updating partitions. Default to 1 minute.
+       AutoUpdatePartitionsInterval time.Duration
+
+       // Schema represents the schema implementation.
+       Schema Schema
+
+       // SchemaValueType represents the type of values for the given schema.
+       SchemaValueType reflect.Type
+
+       // Configure the logger used by the TableView.
+       // By default, a wrapped logrus.StandardLogger will be used, namely,
+       // log.NewLoggerWithLogrus(logrus.StandardLogger())
+       Logger log.Logger
+}
+
+// TableView provides a key-value map view of a compacted topic. Messages 
without keys will be ignored.
+type TableView interface {
+       // Size returns the number of key-value mappings in the TableView.
+       Size() int
+
+       // IsEmpty returns true if this TableView contains no key-value 
mappings.
+       IsEmpty() bool
+
+       // ContainsKey returns true if this TableView contains a mapping for 
the specified key.
+       ContainsKey(key string) bool
+
+       // Get returns the value to which the specified key is mapped, or nil 
if this map contains no mapping for the key.
+       Get(key string) interface{}
+
+       // Entries returns a map view of the mappings contained in this 
TableView.
+       Entries() map[string]interface{}
+
+       // Keys returns a slice of the keys contained in this TableView.
+       Keys() []string
+
+       // ForEach performs the give action for each entry in this map until 
all entries have been processed or the action returns an error.
+       ForEach(func(string, interface{}) error) error
+
+       // ForEachAndListen performs the give action for each entry in this map 
until all entries have been processed or the action returns an error.
+       ForEachAndListen(func(string, interface{}) error) error

Review comment:
       What's the difference between `ForEach` and `ForEachAndListen`?

##########
File path: pulsar/table_view_impl.go
##########
@@ -0,0 +1,261 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "reflect"
+       "sync"
+       "time"
+
+       "github.com/apache/pulsar-client-go/pulsar/log"
+       "github.com/sirupsen/logrus"
+)
+
+type TableViewImpl struct {
+       client  *client
+       options TableViewOptions
+
+       dataMu sync.Mutex
+       data   map[string]interface{}
+
+       readersMu        sync.Mutex
+       readers          map[string]Reader
+       readerCancelFunc map[string]context.CancelFunc
+
+       listenersMu sync.Mutex
+       listeners   []func(string, interface{}) error
+
+       logger   log.Logger
+       closed   bool
+       closedCh chan struct{}
+}
+
+func newTableView(client *client, options TableViewOptions) (TableView, error) 
{
+       if options.Topic == "" {
+               return nil, newError(TopicNotFound, "topic is required")
+       }
+
+       if options.Schema != nil && options.SchemaValueType == nil {
+               return nil, newError(InvalidConfiguration, "SchemaValueType is 
required when Schema is present")
+       }
+
+       var logger log.Logger
+       if options.Logger != nil {
+               logger = options.Logger
+       } else {
+               logger = log.NewLoggerWithLogrus(logrus.StandardLogger())
+       }
+
+       if options.AutoUpdatePartitionsInterval == 0 {
+               options.AutoUpdatePartitionsInterval = time.Minute
+       }
+
+       tv := TableViewImpl{
+               client:           client,
+               options:          options,
+               data:             make(map[string]interface{}),
+               readers:          make(map[string]Reader),
+               readerCancelFunc: make(map[string]context.CancelFunc),
+               logger:           logger,
+               closedCh:         make(chan struct{}),
+       }
+
+       go tv.periodicPartitionUpdateCheck()
+
+       return &tv, nil
+}
+
+func (tv *TableViewImpl) periodicPartitionUpdateCheck() {
+       check := func() error {
+               partitionsArray, err := 
tv.client.TopicPartitions(tv.options.Topic)

Review comment:
       In the newTableView function should we try to first fetch the partitions 
before starting the periodic updates? The reason I bring this up is this call 
could fail (like an auth failure) and a user would need to look through logs to 
figure it out. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to