RobertIndie commented on code in PR #955:
URL: https://github.com/apache/pulsar-client-go/pull/955#discussion_r1119711614


##########
pulsar/internal/memory_limit_controller_test.go:
##########
@@ -0,0 +1,184 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+       "context"
+       "sync"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestLimit(t *testing.T) {
+
+       mlc := NewMemoryLimitController(100)
+
+       for i := 0; i < 101; i++ {
+               assert.True(t, mlc.TryReserveMemory(1))
+       }
+
+       assert.False(t, mlc.TryReserveMemory(1))
+       assert.Equal(t, int64(101), mlc.CurrentUsage())
+       assert.Equal(t, 1.01, mlc.CurrentUsagePercent())

Review Comment:
   Can we check equal for float type here? Looks like `assert.Equal` doesn't 
due with the float case.



##########
pulsar/internal/memory_limit_controller.go:
##########
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+       "context"
+       "sync"
+       "sync/atomic"
+)
+
+type MemoryLimitController interface {
+       ReserveMemory(ctx context.Context, size int64) bool
+       TryReserveMemory(size int64) bool
+       ForceReserveMemory(size int64)
+       ReleaseMemory(size int64)
+       CurrentUsage() int64
+       CurrentUsagePercent() float64
+       IsMemoryLimited() bool
+}
+
+type memoryLimitController struct {
+       limit        int64
+       chCond       ChCond
+       currentUsage int64
+}
+
+func NewMemoryLimitController(limit int64) MemoryLimitController {
+       mlc := &memoryLimitController{
+               limit:  limit,
+               chCond: *NewCond(&sync.Mutex{}),

Review Comment:
   This seems will result in value copied. Better to use pointer type here.



##########
pulsar/internal/channel_cond_test.go:
##########
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+       "context"
+       "sync"
+       "testing"
+       "time"
+)
+
+func TestChCond(t *testing.T) {
+       cond := NewCond(&sync.Mutex{})
+       wg := sync.WaitGroup{}
+       wg.Add(1)
+       go func() {
+               cond.L.Lock()
+               cond.Wait()
+               cond.L.Unlock()
+               wg.Done()
+       }()
+       time.Sleep(10 * time.Millisecond)
+       cond.Broadcast()
+       wg.Wait()
+}
+
+func TestChCondWithContext(t *testing.T) {
+       cond := NewCond(&sync.Mutex{})
+       wg := sync.WaitGroup{}
+       ctx, cancel := context.WithCancel(context.Background())
+       wg.Add(1)
+       go func() {
+               cond.L.Lock()

Review Comment:
   It's better to add `Lock` and `Unlock` method to `ChCond`.  



##########
pulsar/producer_partition.go:
##########
@@ -494,14 +496,15 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
 
        // The block chan must be closed when returned with exception
        defer request.stopBlock()
-       if !p.canAddToQueue(request) {
+       if !p.canAddToQueue(request, uncompressedPayloadSize) {
                return
        }
 
        if p.options.DisableMultiSchema {
                if msg.Schema != nil && p.options.Schema != nil &&
                        msg.Schema.GetSchemaInfo().hash() != 
p.options.Schema.GetSchemaInfo().hash() {
                        p.publishSemaphore.Release()
+                       p.client.memLimit.ReleaseMemory(uncompressedPayloadSize)
                        request.callback(nil, request.msg, fmt.Errorf("msg 
schema can not match with producer schema"))
                        p.log.WithError(err).Errorf("The producer %s of the 
topic %s is disabled the `MultiSchema`", p.producerName, p.topic)

Review Comment:
   How about we combine these lines into one function like 
`completeCallbackAndReleaseSemaphore` in java? This will help subsequent 
developers not miss releasing any resources when adding new logic.



##########
pulsar/producer_test.go:
##########
@@ -1779,3 +1779,126 @@ func TestWaitForExclusiveProducer(t *testing.T) {
        producer1.Close()
        wg.Wait()
 }
+
+func TestMemLimitRejectProducerMessages(t *testing.T) {
+
+       c, err := NewClient(ClientOptions{
+               URL:              serviceURL,
+               MemoryLimitBytes: 100 * 1024,
+       })
+       assert.NoError(t, err)
+       defer c.Close()
+
+       topicName := newTopicName()
+       producer1, _ := c.CreateProducer(ProducerOptions{
+               Topic:                   topicName,
+               DisableBlockIfQueueFull: true,
+               DisableBatching:         false,
+               BatchingMaxPublishDelay: 100 * time.Second,
+               SendTimeout:             2 * time.Second,
+       })
+
+       producer2, _ := c.CreateProducer(ProducerOptions{
+               Topic:                   topicName,
+               DisableBlockIfQueueFull: true,
+               DisableBatching:         false,
+               BatchingMaxPublishDelay: 100 * time.Second,
+               SendTimeout:             2 * time.Second,
+       })
+
+       n := 101
+       for i := 0; i < n/2; i++ {
+               producer1.SendAsync(context.Background(), &ProducerMessage{
+                       Payload: make([]byte, 1024),
+               }, func(id MessageID, message *ProducerMessage, e error) {})
+
+               producer2.SendAsync(context.Background(), &ProducerMessage{
+                       Payload: make([]byte, 1024),
+               }, func(id MessageID, message *ProducerMessage, e error) {})
+       }
+       // Last message in order to reach the limit
+       producer1.SendAsync(context.Background(), &ProducerMessage{
+               Payload: make([]byte, 1024),
+       }, func(id MessageID, message *ProducerMessage, e error) {})
+       time.Sleep(100 * time.Millisecond)
+       assert.Equal(t, int64(n*1024), c.(*client).memLimit.CurrentUsage())
+
+       _, err = producer1.Send(context.Background(), &ProducerMessage{
+               Payload: make([]byte, 1024),
+       })
+       assert.Error(t, err)
+       assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))
+
+       _, err = producer2.Send(context.Background(), &ProducerMessage{
+               Payload: make([]byte, 1024),
+       })
+       assert.Error(t, err)
+       assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))
+
+       // flush pending msg
+       producer1.Flush()
+       producer2.Flush()

Review Comment:
   lint warning here: `Unhandled error `
   Same for line 1897



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to