cckellogg commented on a change in pull request #743: URL: https://github.com/apache/pulsar-client-go/pull/743#discussion_r828376524
########## File path: pulsar/table_view.go ########## @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "reflect" + "time" + + "github.com/apache/pulsar-client-go/pulsar/log" +) + +// TableViewOptions contains the options for creating a TableView +type TableViewOptions struct { + // Topic specifies the topic this table view will subscribe on. + // This argument is required when constructing the table view. + Topic string + + // Set the interval of updating partitions. Default to 1 minute. + AutoUpdatePartitionsInterval time.Duration + + // Schema represents the schema implementation. + Schema Schema + + // SchemaValueType represents the type of values for the given schema. + SchemaValueType reflect.Type Review comment: Curious how does is the Schema related to the SchemaValueType? ########## File path: pulsar/table_view_impl.go ########## @@ -0,0 +1,261 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "context" + "errors" + "fmt" + "reflect" + "sync" + "time" + + "github.com/apache/pulsar-client-go/pulsar/log" + "github.com/sirupsen/logrus" +) + +type TableViewImpl struct { + client *client + options TableViewOptions + + dataMu sync.Mutex + data map[string]interface{} + + readersMu sync.Mutex + readers map[string]Reader Review comment: Nitpick should we create a wrapper struct so we don't need to maintain two map? Maybe something like this? ``` type cancelReader struct { reader Reader cancelFunc context.CancelFunc } ``` ########## File path: pulsar/table_view_impl.go ########## @@ -0,0 +1,261 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "context" + "errors" + "fmt" + "reflect" + "sync" + "time" + + "github.com/apache/pulsar-client-go/pulsar/log" + "github.com/sirupsen/logrus" +) + +type TableViewImpl struct { + client *client + options TableViewOptions + + dataMu sync.Mutex + data map[string]interface{} + + readersMu sync.Mutex + readers map[string]Reader + readerCancelFunc map[string]context.CancelFunc + + listenersMu sync.Mutex + listeners []func(string, interface{}) error + + logger log.Logger + closed bool + closedCh chan struct{} +} + +func newTableView(client *client, options TableViewOptions) (TableView, error) { + if options.Topic == "" { + return nil, newError(TopicNotFound, "topic is required") + } + + if options.Schema != nil && options.SchemaValueType == nil { + return nil, newError(InvalidConfiguration, "SchemaValueType is required when Schema is present") + } + + var logger log.Logger + if options.Logger != nil { + logger = options.Logger + } else { + logger = log.NewLoggerWithLogrus(logrus.StandardLogger()) + } + + if options.AutoUpdatePartitionsInterval == 0 { + options.AutoUpdatePartitionsInterval = time.Minute + } + + tv := TableViewImpl{ + client: client, + options: options, + data: make(map[string]interface{}), + readers: make(map[string]Reader), + readerCancelFunc: make(map[string]context.CancelFunc), + logger: logger, + closedCh: make(chan struct{}), + } + + go tv.periodicPartitionUpdateCheck() + + return &tv, nil +} + +func (tv *TableViewImpl) periodicPartitionUpdateCheck() { + check := func() error { + partitionsArray, err := tv.client.TopicPartitions(tv.options.Topic) + if err != nil { + return fmt.Errorf("tv.client.TopicPartitions(%s) failed: %w", tv.options.Topic, err) + } + + partitions := make(map[string]bool, len(partitionsArray)) + for _, partition := range partitionsArray { + partitions[partition] = true + } + + tv.readersMu.Lock() + defer tv.readersMu.Unlock() + + for partition := range partitions { + if _, ok := tv.readers[partition]; !ok { + reader, err := newReader(tv.client, ReaderOptions{ + Topic: partition, + StartMessageID: EarliestMessageID(), + ReadCompacted: true, + // TODO: Pooling? + Schema: tv.options.Schema, + }) + if err != nil { + return fmt.Errorf("create new reader failed for %s: %w", partition, err) + } + for reader.HasNext() { + msg, err := reader.Next(context.Background()) + if err != nil { + tv.logger.Errorf("read next message failed for %s: %w", partition, err) + } + tv.handleMessage(msg) + } + tv.readers[partition] = reader + ctx, cancelFunc := context.WithCancel(context.Background()) + tv.readerCancelFunc[partition] = cancelFunc + go tv.watchReaderForNewMessages(ctx, reader) + } + } + + for partition, reader := range tv.readers { + if _, ok := partitions[partition]; !ok { + tv.readerCancelFunc[partition]() + delete(tv.readerCancelFunc, partition) + + reader.Close() + delete(tv.readers, partition) + } + } + return nil + } + + for { + if err := check(); err != nil { + tv.logger.Errorf("failed to check for changes in number of partitions: %w", err) + } + select { + case <-tv.closedCh: + // If the TableViewImpl has been closed, stop checking for partition updates + return + case <-time.After(tv.options.AutoUpdatePartitionsInterval): + continue + } + } +} + +func (tv *TableViewImpl) Size() int { + tv.dataMu.Lock() + defer tv.dataMu.Unlock() + return len(tv.data) +} + +func (tv *TableViewImpl) IsEmpty() bool { + tv.dataMu.Lock() + defer tv.dataMu.Unlock() + return tv.Size() == 0 +} + +func (tv *TableViewImpl) ContainsKey(key string) bool { + tv.dataMu.Lock() + defer tv.dataMu.Unlock() + _, ok := tv.data[key] + return ok +} + +func (tv *TableViewImpl) Get(key string) interface{} { + tv.dataMu.Lock() + defer tv.dataMu.Unlock() + return tv.data[key] +} + +func (tv *TableViewImpl) Entries() map[string]interface{} { + tv.dataMu.Lock() + defer tv.dataMu.Unlock() + return tv.data +} + +func (tv *TableViewImpl) Keys() []string { + tv.dataMu.Lock() + defer tv.dataMu.Unlock() + keys := make([]string, len(tv.data)) + i := 0 + for k := range tv.data { + keys[i] = k + i++ + } + return keys +} + +func (tv *TableViewImpl) ForEach(action func(string, interface{}) error) error { + tv.dataMu.Lock() + defer tv.dataMu.Unlock() + for k, v := range tv.data { + if err := action(k, v); err != nil { + return err + } + } + return nil +} + +func (tv *TableViewImpl) ForEachAndListen(action func(string, interface{}) error) error { + tv.listenersMu.Lock() + defer tv.listenersMu.Unlock() + + if err := tv.ForEach(action); err != nil { + return err + } + + tv.listeners = append(tv.listeners, action) + return nil +} + +func (tv *TableViewImpl) Close() { + tv.readersMu.Lock() + defer tv.readersMu.Unlock() + + if !tv.closed { + tv.closed = true + for _, reader := range tv.readers { + reader.Close() + } + close(tv.closedCh) + } +} + +func (tv *TableViewImpl) handleMessage(msg Message) { + tv.dataMu.Lock() + defer tv.dataMu.Unlock() + + payload := reflect.Indirect(reflect.New(tv.options.SchemaValueType)).Interface() + tv.logger.Errorf("payload: %v %T", payload, payload) Review comment: Should this be debug? Same for line 241? ########## File path: pulsar/table_view.go ########## @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "reflect" + "time" + + "github.com/apache/pulsar-client-go/pulsar/log" +) + +// TableViewOptions contains the options for creating a TableView +type TableViewOptions struct { + // Topic specifies the topic this table view will subscribe on. + // This argument is required when constructing the table view. + Topic string + + // Set the interval of updating partitions. Default to 1 minute. + AutoUpdatePartitionsInterval time.Duration + + // Schema represents the schema implementation. + Schema Schema + + // SchemaValueType represents the type of values for the given schema. + SchemaValueType reflect.Type + + // Configure the logger used by the TableView. + // By default, a wrapped logrus.StandardLogger will be used, namely, + // log.NewLoggerWithLogrus(logrus.StandardLogger()) + Logger log.Logger +} + +// TableView provides a key-value map view of a compacted topic. Messages without keys will be ignored. +type TableView interface { + // Size returns the number of key-value mappings in the TableView. + Size() int + + // IsEmpty returns true if this TableView contains no key-value mappings. + IsEmpty() bool + + // ContainsKey returns true if this TableView contains a mapping for the specified key. + ContainsKey(key string) bool + + // Get returns the value to which the specified key is mapped, or nil if this map contains no mapping for the key. + Get(key string) interface{} + + // Entries returns a map view of the mappings contained in this TableView. + Entries() map[string]interface{} + + // Keys returns a slice of the keys contained in this TableView. + Keys() []string + + // ForEach performs the give action for each entry in this map until all entries have been processed or the action returns an error. + ForEach(func(string, interface{}) error) error + + // ForEachAndListen performs the give action for each entry in this map until all entries have been processed or the action returns an error. + ForEachAndListen(func(string, interface{}) error) error Review comment: What's the difference between `ForEach` and `ForEachAndListen`? ########## File path: pulsar/table_view_impl.go ########## @@ -0,0 +1,261 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "context" + "errors" + "fmt" + "reflect" + "sync" + "time" + + "github.com/apache/pulsar-client-go/pulsar/log" + "github.com/sirupsen/logrus" +) + +type TableViewImpl struct { + client *client + options TableViewOptions + + dataMu sync.Mutex + data map[string]interface{} + + readersMu sync.Mutex + readers map[string]Reader + readerCancelFunc map[string]context.CancelFunc + + listenersMu sync.Mutex + listeners []func(string, interface{}) error + + logger log.Logger + closed bool + closedCh chan struct{} +} + +func newTableView(client *client, options TableViewOptions) (TableView, error) { + if options.Topic == "" { + return nil, newError(TopicNotFound, "topic is required") + } + + if options.Schema != nil && options.SchemaValueType == nil { + return nil, newError(InvalidConfiguration, "SchemaValueType is required when Schema is present") + } + + var logger log.Logger + if options.Logger != nil { + logger = options.Logger + } else { + logger = log.NewLoggerWithLogrus(logrus.StandardLogger()) + } + + if options.AutoUpdatePartitionsInterval == 0 { + options.AutoUpdatePartitionsInterval = time.Minute + } + + tv := TableViewImpl{ + client: client, + options: options, + data: make(map[string]interface{}), + readers: make(map[string]Reader), + readerCancelFunc: make(map[string]context.CancelFunc), + logger: logger, + closedCh: make(chan struct{}), + } + + go tv.periodicPartitionUpdateCheck() + + return &tv, nil +} + +func (tv *TableViewImpl) periodicPartitionUpdateCheck() { + check := func() error { + partitionsArray, err := tv.client.TopicPartitions(tv.options.Topic) Review comment: In the newTableView function should we try to first fetch the partitions before starting the periodic updates? The reason I bring this up is this call could fail (like an auth failure) and a user would need to look through logs to figure it out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
