This is an automated email from the ASF dual-hosted git repository.

linkinstar pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/answer.git

commit 5ff6106d37807ef79fd04ab4ba56a43a961bf6fe
Author: ferhat elmas <[email protected]>
AuthorDate: Tue Jan 6 22:25:29 2026 +0100

    fix: address comments and add a test
    
    Signed-off-by: ferhat elmas <[email protected]>
---
 internal/base/queue/queue.go                     | 16 +++++++--
 internal/base/queue/queue_test.go                | 42 ++++++++++++++++++++++++
 internal/service/activityqueue/activity_queue.go |  2 +-
 internal/service/eventqueue/event_queue.go       |  2 +-
 internal/service/noticequeue/notice_queue.go     |  4 +--
 5 files changed, 59 insertions(+), 7 deletions(-)

diff --git a/internal/base/queue/queue.go b/internal/base/queue/queue.go
index ae23d341..b3a8757a 100644
--- a/internal/base/queue/queue.go
+++ b/internal/base/queue/queue.go
@@ -26,6 +26,17 @@ import (
        "github.com/segmentfault/pacman/log"
 )
 
+type Service[T any] interface {
+       // Send enqueues a message to be processed asynchronously.
+       Send(ctx context.Context, msg T)
+
+       // RegisterHandler sets the handler function for processing messages.
+       RegisterHandler(handler func(ctx context.Context, msg T) error)
+
+       // Close gracefully shuts down the queue, waiting for pending messages 
to be processed.
+       Close()
+}
+
 // Queue is a generic message queue service that processes messages 
asynchronously.
 // It is thread-safe and supports graceful shutdown.
 type Queue[T any] struct {
@@ -51,10 +62,9 @@ func New[T any](name string, bufferSize int) *Queue[T] {
 // It will block if the queue is full.
 func (q *Queue[T]) Send(ctx context.Context, msg T) {
        q.mu.RLock()
-       closed := q.closed
-       q.mu.RUnlock()
+       defer q.mu.RUnlock()
 
-       if closed {
+       if q.closed {
                log.Warnf("[%s] queue is closed, dropping message", q.name)
                return
        }
diff --git a/internal/base/queue/queue_test.go 
b/internal/base/queue/queue_test.go
index 79355fb7..23f0fda7 100644
--- a/internal/base/queue/queue_test.go
+++ b/internal/base/queue/queue_test.go
@@ -21,6 +21,7 @@ package queue
 
 import (
        "context"
+       "fmt"
        "sync"
        "sync/atomic"
        "testing"
@@ -209,3 +210,44 @@ func TestQueue_ConcurrentRegisterHandler(t *testing.T) {
        }
        wg.Wait()
 }
+
+// TestQueue_SendCloseRace is a regression test for the race condition between
+// Send and Close. Without proper synchronization, concurrent Send and Close
+// calls could cause a "send on closed channel" panic.
+// Run with: go test -race -run TestQueue_SendCloseRace
+func TestQueue_SendCloseRace(t *testing.T) {
+       for i := range 100 {
+               t.Run(fmt.Sprintf("iteration_%d", i), func(t *testing.T) {
+                       // Use large buffer to avoid blocking on channel send 
while holding RLock
+                       q := New[*testMessage]("test-race", 1000)
+                       q.RegisterHandler(func(ctx context.Context, msg 
*testMessage) error {
+                               return nil
+                       })
+
+                       var wg sync.WaitGroup
+
+                       // Use cancellable context so senders can exit when 
Close is called
+                       ctx, cancel := context.WithCancel(context.Background())
+
+                       // Start multiple senders
+                       for j := range 10 {
+                               wg.Add(1)
+                               go func(id int) {
+                                       defer wg.Done()
+                                       for k := range 100 {
+                                               q.Send(ctx, &testMessage{ID: 
id*1000 + k})
+                                       }
+                               }(j)
+                       }
+
+                       // Close while senders are still running
+                       go func() {
+                               time.Sleep(time.Microsecond * 10)
+                               cancel() // Cancel context to unblock any 
waiting senders
+                               q.Close()
+                       }()
+
+                       wg.Wait()
+               })
+       }
+}
diff --git a/internal/service/activityqueue/activity_queue.go 
b/internal/service/activityqueue/activity_queue.go
index d32caf5e..2210977b 100644
--- a/internal/service/activityqueue/activity_queue.go
+++ b/internal/service/activityqueue/activity_queue.go
@@ -24,7 +24,7 @@ import (
        "github.com/apache/answer/internal/schema"
 )
 
-type Service = *queue.Queue[*schema.ActivityMsg]
+type Service queue.Service[*schema.ActivityMsg]
 
 func NewService() Service {
        return queue.New[*schema.ActivityMsg]("activity", 128)
diff --git a/internal/service/eventqueue/event_queue.go 
b/internal/service/eventqueue/event_queue.go
index 8d3a2239..e93a8363 100644
--- a/internal/service/eventqueue/event_queue.go
+++ b/internal/service/eventqueue/event_queue.go
@@ -24,7 +24,7 @@ import (
        "github.com/apache/answer/internal/schema"
 )
 
-type Service = *queue.Queue[*schema.EventMsg]
+type Service queue.Service[*schema.EventMsg]
 
 func NewService() Service {
        return queue.New[*schema.EventMsg]("event", 128)
diff --git a/internal/service/noticequeue/notice_queue.go 
b/internal/service/noticequeue/notice_queue.go
index 138f9ce6..5e4d4b0f 100644
--- a/internal/service/noticequeue/notice_queue.go
+++ b/internal/service/noticequeue/notice_queue.go
@@ -24,13 +24,13 @@ import (
        "github.com/apache/answer/internal/schema"
 )
 
-type Service = *queue.Queue[*schema.NotificationMsg]
+type Service queue.Service[*schema.NotificationMsg]
 
 func NewService() Service {
        return queue.New[*schema.NotificationMsg]("notification", 128)
 }
 
-type ExternalService = *queue.Queue[*schema.ExternalNotificationMsg]
+type ExternalService queue.Service[*schema.ExternalNotificationMsg]
 
 func NewExternalService() ExternalService {
        return 
queue.New[*schema.ExternalNotificationMsg]("external_notification", 128)

Reply via email to