This is an automated email from the ASF dual-hosted git repository. linkinstar pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/answer.git
commit 5ff6106d37807ef79fd04ab4ba56a43a961bf6fe Author: ferhat elmas <[email protected]> AuthorDate: Tue Jan 6 22:25:29 2026 +0100 fix: address comments and add a test Signed-off-by: ferhat elmas <[email protected]> --- internal/base/queue/queue.go | 16 +++++++-- internal/base/queue/queue_test.go | 42 ++++++++++++++++++++++++ internal/service/activityqueue/activity_queue.go | 2 +- internal/service/eventqueue/event_queue.go | 2 +- internal/service/noticequeue/notice_queue.go | 4 +-- 5 files changed, 59 insertions(+), 7 deletions(-) diff --git a/internal/base/queue/queue.go b/internal/base/queue/queue.go index ae23d341..b3a8757a 100644 --- a/internal/base/queue/queue.go +++ b/internal/base/queue/queue.go @@ -26,6 +26,17 @@ import ( "github.com/segmentfault/pacman/log" ) +type Service[T any] interface { + // Send enqueues a message to be processed asynchronously. + Send(ctx context.Context, msg T) + + // RegisterHandler sets the handler function for processing messages. + RegisterHandler(handler func(ctx context.Context, msg T) error) + + // Close gracefully shuts down the queue, waiting for pending messages to be processed. + Close() +} + // Queue is a generic message queue service that processes messages asynchronously. // It is thread-safe and supports graceful shutdown. type Queue[T any] struct { @@ -51,10 +62,9 @@ func New[T any](name string, bufferSize int) *Queue[T] { // It will block if the queue is full. func (q *Queue[T]) Send(ctx context.Context, msg T) { q.mu.RLock() - closed := q.closed - q.mu.RUnlock() + defer q.mu.RUnlock() - if closed { + if q.closed { log.Warnf("[%s] queue is closed, dropping message", q.name) return } diff --git a/internal/base/queue/queue_test.go b/internal/base/queue/queue_test.go index 79355fb7..23f0fda7 100644 --- a/internal/base/queue/queue_test.go +++ b/internal/base/queue/queue_test.go @@ -21,6 +21,7 @@ package queue import ( "context" + "fmt" "sync" "sync/atomic" "testing" @@ -209,3 +210,44 @@ func TestQueue_ConcurrentRegisterHandler(t *testing.T) { } wg.Wait() } + +// TestQueue_SendCloseRace is a regression test for the race condition between +// Send and Close. Without proper synchronization, concurrent Send and Close +// calls could cause a "send on closed channel" panic. +// Run with: go test -race -run TestQueue_SendCloseRace +func TestQueue_SendCloseRace(t *testing.T) { + for i := range 100 { + t.Run(fmt.Sprintf("iteration_%d", i), func(t *testing.T) { + // Use large buffer to avoid blocking on channel send while holding RLock + q := New[*testMessage]("test-race", 1000) + q.RegisterHandler(func(ctx context.Context, msg *testMessage) error { + return nil + }) + + var wg sync.WaitGroup + + // Use cancellable context so senders can exit when Close is called + ctx, cancel := context.WithCancel(context.Background()) + + // Start multiple senders + for j := range 10 { + wg.Add(1) + go func(id int) { + defer wg.Done() + for k := range 100 { + q.Send(ctx, &testMessage{ID: id*1000 + k}) + } + }(j) + } + + // Close while senders are still running + go func() { + time.Sleep(time.Microsecond * 10) + cancel() // Cancel context to unblock any waiting senders + q.Close() + }() + + wg.Wait() + }) + } +} diff --git a/internal/service/activityqueue/activity_queue.go b/internal/service/activityqueue/activity_queue.go index d32caf5e..2210977b 100644 --- a/internal/service/activityqueue/activity_queue.go +++ b/internal/service/activityqueue/activity_queue.go @@ -24,7 +24,7 @@ import ( "github.com/apache/answer/internal/schema" ) -type Service = *queue.Queue[*schema.ActivityMsg] +type Service queue.Service[*schema.ActivityMsg] func NewService() Service { return queue.New[*schema.ActivityMsg]("activity", 128) diff --git a/internal/service/eventqueue/event_queue.go b/internal/service/eventqueue/event_queue.go index 8d3a2239..e93a8363 100644 --- a/internal/service/eventqueue/event_queue.go +++ b/internal/service/eventqueue/event_queue.go @@ -24,7 +24,7 @@ import ( "github.com/apache/answer/internal/schema" ) -type Service = *queue.Queue[*schema.EventMsg] +type Service queue.Service[*schema.EventMsg] func NewService() Service { return queue.New[*schema.EventMsg]("event", 128) diff --git a/internal/service/noticequeue/notice_queue.go b/internal/service/noticequeue/notice_queue.go index 138f9ce6..5e4d4b0f 100644 --- a/internal/service/noticequeue/notice_queue.go +++ b/internal/service/noticequeue/notice_queue.go @@ -24,13 +24,13 @@ import ( "github.com/apache/answer/internal/schema" ) -type Service = *queue.Queue[*schema.NotificationMsg] +type Service queue.Service[*schema.NotificationMsg] func NewService() Service { return queue.New[*schema.NotificationMsg]("notification", 128) } -type ExternalService = *queue.Queue[*schema.ExternalNotificationMsg] +type ExternalService queue.Service[*schema.ExternalNotificationMsg] func NewExternalService() ExternalService { return queue.New[*schema.ExternalNotificationMsg]("external_notification", 128)
