This is an automated email from the ASF dual-hosted git repository.

linkinstar pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/answer.git

commit 26d868b1236e4540bfd8125cc727eee90d7131e9
Author: ferhat elmas <[email protected]>
AuthorDate: Tue Dec 9 01:08:57 2025 +0100

    refactor(queue): improve queues
    
    * fix race condition for registering handler
    * add close method
    * use generics to reduce duplication
    * rename packages to drop underscore for go convention
    * rename interface to drop stutter with package name
    
    Signed-off-by: ferhat elmas <[email protected]>
---
 cmd/wire_gen.go                                    |  54 +++---
 internal/base/queue/queue.go                       | 120 ++++++++++++
 internal/base/queue/queue_test.go                  | 211 +++++++++++++++++++++
 internal/controller/template_controller.go         |   6 +-
 internal/repo/activity/answer_repo.go              |   6 +-
 internal/repo/activity/vote_repo.go                |   6 +-
 internal/service/activity_common/activity.go       |   6 +-
 internal/service/activity_queue/activity_queue.go  |  69 -------
 internal/service/activityqueue/activity_queue.go   |  31 +++
 internal/service/badge/badge_award_service.go      |   6 +-
 internal/service/badge/badge_event_handler.go      |   6 +-
 internal/service/comment/comment_service.go        |  22 +--
 internal/service/content/answer_service.go         |  22 +--
 internal/service/content/question_service.go       |  22 +--
 internal/service/content/revision_service.go       |  12 +-
 internal/service/content/user_service.go           |   6 +-
 internal/service/content/vote_service.go           |   6 +-
 internal/service/event_queue/event_queue.go        |  69 -------
 internal/service/eventqueue/event_queue.go         |  31 +++
 internal/service/meta/meta_service.go              |   6 +-
 .../notice_queue/external_notification_queue.go    |  69 -------
 internal/service/notice_queue/notice_queue.go      |  69 -------
 internal/service/noticequeue/notice_queue.go       |  37 ++++
 .../service/notification/external_notification.go  |   6 +-
 .../service/notification_common/notification.go    |   6 +-
 internal/service/provider.go                       |  14 +-
 internal/service/question_common/question.go       |   6 +-
 internal/service/report/report_service.go          |   6 +-
 internal/service/review/review_service.go          |  10 +-
 internal/service/tag/tag_service.go                |   6 +-
 internal/service/tag_common/tag_common.go          |   6 +-
 31 files changed, 553 insertions(+), 399 deletions(-)

diff --git a/cmd/wire_gen.go b/cmd/wire_gen.go
index aae1c6af..22a70f29 100644
--- a/cmd/wire_gen.go
+++ b/cmd/wire_gen.go
@@ -72,7 +72,7 @@ import (
        "github.com/apache/answer/internal/service/action"
        activity2 "github.com/apache/answer/internal/service/activity"
        activity_common2 
"github.com/apache/answer/internal/service/activity_common"
-       "github.com/apache/answer/internal/service/activity_queue"
+       "github.com/apache/answer/internal/service/activityqueue"
        "github.com/apache/answer/internal/service/answer_common"
        auth2 "github.com/apache/answer/internal/service/auth"
        badge2 "github.com/apache/answer/internal/service/badge"
@@ -83,14 +83,14 @@ import (
        config2 "github.com/apache/answer/internal/service/config"
        "github.com/apache/answer/internal/service/content"
        "github.com/apache/answer/internal/service/dashboard"
-       "github.com/apache/answer/internal/service/event_queue"
+       "github.com/apache/answer/internal/service/eventqueue"
        export2 "github.com/apache/answer/internal/service/export"
        file_record2 "github.com/apache/answer/internal/service/file_record"
        "github.com/apache/answer/internal/service/follow"
        "github.com/apache/answer/internal/service/importer"
        meta2 "github.com/apache/answer/internal/service/meta"
        "github.com/apache/answer/internal/service/meta_common"
-       "github.com/apache/answer/internal/service/notice_queue"
+       "github.com/apache/answer/internal/service/noticequeue"
        "github.com/apache/answer/internal/service/notification"
        "github.com/apache/answer/internal/service/notification_common"
        "github.com/apache/answer/internal/service/object_info"
@@ -172,29 +172,29 @@ func initApplication(debug bool, serverConf *conf.Server, 
dbConf *data.Database,
        tagRepo := tag.NewTagRepo(dataData, uniqueIDRepo)
        revisionRepo := revision.NewRevisionRepo(dataData, uniqueIDRepo)
        revisionService := revision_common.NewRevisionService(revisionRepo, 
userRepo)
-       activityQueueService := activity_queue.NewActivityQueueService()
-       tagCommonService := tag_common2.NewTagCommonService(tagCommonRepo, 
tagRelRepo, tagRepo, revisionService, siteInfoCommonService, 
activityQueueService)
+       v := activityqueue.NewService()
+       tagCommonService := tag_common2.NewTagCommonService(tagCommonRepo, 
tagRelRepo, tagRepo, revisionService, siteInfoCommonService, v)
        collectionRepo := collection.NewCollectionRepo(dataData, uniqueIDRepo)
        collectionCommon := collectioncommon.NewCollectionCommon(collectionRepo)
        answerCommon := answercommon.NewAnswerCommon(answerRepo)
        metaRepo := meta.NewMetaRepo(dataData)
        metaCommonService := metacommon.NewMetaCommonService(metaRepo)
-       questionCommon := questioncommon.NewQuestionCommon(questionRepo, 
answerRepo, voteRepo, followRepo, tagCommonService, userCommon, 
collectionCommon, answerCommon, metaCommonService, configService, 
activityQueueService, revisionRepo, siteInfoCommonService, dataData)
-       eventQueueService := event_queue.NewEventQueueService()
+       questionCommon := questioncommon.NewQuestionCommon(questionRepo, 
answerRepo, voteRepo, followRepo, tagCommonService, userCommon, 
collectionCommon, answerCommon, metaCommonService, configService, v, 
revisionRepo, siteInfoCommonService, dataData)
+       v2 := eventqueue.NewService()
        fileRecordRepo := file_record.NewFileRecordRepo(dataData)
        fileRecordService := file_record2.NewFileRecordService(fileRecordRepo, 
revisionRepo, serviceConf, siteInfoCommonService, userCommon)
-       userService := content.NewUserService(userRepo, userActiveActivityRepo, 
activityRepo, emailService, authService, siteInfoCommonService, 
userRoleRelService, userCommon, userExternalLoginService, 
userNotificationConfigRepo, userNotificationConfigService, questionCommon, 
eventQueueService, fileRecordService)
+       userService := content.NewUserService(userRepo, userActiveActivityRepo, 
activityRepo, emailService, authService, siteInfoCommonService, 
userRoleRelService, userCommon, userExternalLoginService, 
userNotificationConfigRepo, userNotificationConfigService, questionCommon, v2, 
fileRecordService)
        captchaRepo := captcha.NewCaptchaRepo(dataData)
        captchaService := action.NewCaptchaService(captchaRepo)
        userController := controller.NewUserController(authService, 
userService, captchaService, emailService, siteInfoCommonService, 
userNotificationConfigService)
        commentRepo := comment.NewCommentRepo(dataData, uniqueIDRepo)
        commentCommonRepo := comment.NewCommentCommonRepo(dataData, 
uniqueIDRepo)
        objService := object_info.NewObjService(answerRepo, questionRepo, 
commentCommonRepo, tagCommonRepo, tagCommonService)
-       notificationQueueService := notice_queue.NewNotificationQueueService()
-       externalNotificationQueueService := 
notice_queue.NewNewQuestionNotificationQueueService()
+       v3 := noticequeue.NewService()
+       v4 := noticequeue.NewExternalService()
        reviewRepo := review.NewReviewRepo(dataData)
-       reviewService := review2.NewReviewService(reviewRepo, objService, 
userCommon, userRepo, questionRepo, answerRepo, userRoleRelService, 
externalNotificationQueueService, tagCommonService, questionCommon, 
notificationQueueService, siteInfoCommonService, commentCommonRepo)
-       commentService := comment2.NewCommentService(commentRepo, 
commentCommonRepo, userCommon, objService, voteRepo, emailService, userRepo, 
notificationQueueService, externalNotificationQueueService, 
activityQueueService, eventQueueService, reviewService)
+       reviewService := review2.NewReviewService(reviewRepo, objService, 
userCommon, userRepo, questionRepo, answerRepo, userRoleRelService, v4, 
tagCommonService, questionCommon, v3, siteInfoCommonService, commentCommonRepo)
+       commentService := comment2.NewCommentService(commentRepo, 
commentCommonRepo, userCommon, objService, voteRepo, emailService, userRepo, 
v3, v4, v, v2, reviewService)
        rolePowerRelRepo := role.NewRolePowerRelRepo(dataData)
        rolePowerRelService := role2.NewRolePowerRelService(rolePowerRelRepo, 
userRoleRelService)
        rankService := rank2.NewRankService(userCommon, userRankRepo, 
objService, userRoleRelService, rolePowerRelService, configService)
@@ -202,17 +202,17 @@ func initApplication(debug bool, serverConf *conf.Server, 
dbConf *data.Database,
        rateLimitMiddleware := middleware.NewRateLimitMiddleware(limitRepo)
        commentController := controller.NewCommentController(commentService, 
rankService, captchaService, rateLimitMiddleware)
        reportRepo := report.NewReportRepo(dataData, uniqueIDRepo)
-       tagService := tag2.NewTagService(tagRepo, tagCommonService, 
revisionService, followRepo, siteInfoCommonService, activityQueueService)
-       answerActivityRepo := activity.NewAnswerActivityRepo(dataData, 
activityRepo, userRankRepo, notificationQueueService)
+       tagService := tag2.NewTagService(tagRepo, tagCommonService, 
revisionService, followRepo, siteInfoCommonService, v)
+       answerActivityRepo := activity.NewAnswerActivityRepo(dataData, 
activityRepo, userRankRepo, v3)
        answerActivityService := 
activity2.NewAnswerActivityService(answerActivityRepo, configService)
-       externalNotificationService := 
notification.NewExternalNotificationService(dataData, 
userNotificationConfigRepo, followRepo, emailService, userRepo, 
externalNotificationQueueService, userExternalLoginRepo, siteInfoCommonService)
-       questionService := content.NewQuestionService(activityRepo, 
questionRepo, answerRepo, tagCommonService, tagService, questionCommon, 
userCommon, userRepo, userRoleRelService, revisionService, metaCommonService, 
collectionCommon, answerActivityService, emailService, 
notificationQueueService, externalNotificationQueueService, 
activityQueueService, siteInfoCommonService, externalNotificationService, 
reviewService, configService, eventQueueService, reviewRepo)
-       answerService := content.NewAnswerService(answerRepo, questionRepo, 
questionCommon, userCommon, collectionCommon, userRepo, revisionService, 
answerActivityService, answerCommon, voteRepo, emailService, 
userRoleRelService, notificationQueueService, externalNotificationQueueService, 
activityQueueService, reviewService, eventQueueService)
+       externalNotificationService := 
notification.NewExternalNotificationService(dataData, 
userNotificationConfigRepo, followRepo, emailService, userRepo, v4, 
userExternalLoginRepo, siteInfoCommonService)
+       questionService := content.NewQuestionService(activityRepo, 
questionRepo, answerRepo, tagCommonService, tagService, questionCommon, 
userCommon, userRepo, userRoleRelService, revisionService, metaCommonService, 
collectionCommon, answerActivityService, emailService, v3, v4, v, 
siteInfoCommonService, externalNotificationService, reviewService, 
configService, v2, reviewRepo)
+       answerService := content.NewAnswerService(answerRepo, questionRepo, 
questionCommon, userCommon, collectionCommon, userRepo, revisionService, 
answerActivityService, answerCommon, voteRepo, emailService, 
userRoleRelService, v3, v4, v, reviewService, v2)
        reportHandle := report_handle.NewReportHandle(questionService, 
answerService, commentService)
-       reportService := report2.NewReportService(reportRepo, objService, 
userCommon, answerRepo, questionRepo, commentCommonRepo, reportHandle, 
configService, eventQueueService)
+       reportService := report2.NewReportService(reportRepo, objService, 
userCommon, answerRepo, questionRepo, commentCommonRepo, reportHandle, 
configService, v2)
        reportController := controller.NewReportController(reportService, 
rankService, captchaService)
-       contentVoteRepo := activity.NewVoteRepo(dataData, activityRepo, 
userRankRepo, notificationQueueService)
-       voteService := content.NewVoteService(contentVoteRepo, configService, 
questionRepo, answerRepo, commentCommonRepo, objService, eventQueueService)
+       contentVoteRepo := activity.NewVoteRepo(dataData, activityRepo, 
userRankRepo, v3)
+       voteService := content.NewVoteService(contentVoteRepo, configService, 
questionRepo, answerRepo, commentCommonRepo, objService, v2)
        voteController := controller.NewVoteController(voteService, 
rankService, captchaService)
        tagController := controller.NewTagController(tagService, 
tagCommonService, rankService)
        followFollowRepo := activity.NewFollowRepo(dataData, uniqueIDRepo, 
activityRepo)
@@ -228,7 +228,7 @@ func initApplication(debug bool, serverConf *conf.Server, 
dbConf *data.Database,
        searchService := content.NewSearchService(searchParser, searchRepo)
        searchController := controller.NewSearchController(searchService, 
captchaService)
        reviewActivityRepo := activity.NewReviewActivityRepo(dataData, 
activityRepo, userRankRepo, configService)
-       contentRevisionService := content.NewRevisionService(revisionRepo, 
userCommon, questionCommon, answerService, objService, questionRepo, 
answerRepo, tagRepo, tagCommonService, notificationQueueService, 
activityQueueService, reportRepo, reviewService, reviewActivityRepo)
+       contentRevisionService := content.NewRevisionService(revisionRepo, 
userCommon, questionCommon, answerService, objService, questionRepo, 
answerRepo, tagRepo, tagCommonService, v3, v, reportRepo, reviewService, 
reviewActivityRepo)
        revisionController := 
controller.NewRevisionController(contentRevisionService, rankService)
        rankController := controller.NewRankController(rankService)
        userAdminRepo := user.NewUserAdminRepo(dataData, authRepo)
@@ -244,7 +244,7 @@ func initApplication(debug bool, serverConf *conf.Server, 
dbConf *data.Database,
        siteInfoService := siteinfo.NewSiteInfoService(siteInfoRepo, 
siteInfoCommonService, emailService, tagCommonService, configService, 
questionCommon, fileRecordService)
        siteInfoController := 
controller_admin.NewSiteInfoController(siteInfoService)
        controllerSiteInfoController := 
controller.NewSiteInfoController(siteInfoCommonService)
-       notificationCommon := 
notificationcommon.NewNotificationCommon(dataData, notificationRepo, 
userCommon, activityRepo, followRepo, objService, notificationQueueService, 
userExternalLoginRepo, siteInfoCommonService)
+       notificationCommon := 
notificationcommon.NewNotificationCommon(dataData, notificationRepo, 
userCommon, activityRepo, followRepo, objService, v3, userExternalLoginRepo, 
siteInfoCommonService)
        badgeRepo := badge.NewBadgeRepo(dataData, uniqueIDRepo)
        notificationService := notification.NewNotificationService(dataData, 
notificationRepo, notificationCommon, revisionService, userRepo, reportRepo, 
reviewService, badgeRepo)
        notificationController := 
controller.NewNotificationController(notificationService, rankService)
@@ -253,7 +253,7 @@ func initApplication(debug bool, serverConf *conf.Server, 
dbConf *data.Database,
        uploaderService := uploader.NewUploaderService(serviceConf, 
siteInfoCommonService, fileRecordService)
        uploadController := controller.NewUploadController(uploaderService)
        activityActivityRepo := activity.NewActivityRepo(dataData, 
configService)
-       activityCommon := activity_common2.NewActivityCommon(activityRepo, 
activityQueueService)
+       activityCommon := activity_common2.NewActivityCommon(activityRepo, v)
        commentCommonService := 
comment_common.NewCommentCommonService(commentCommonRepo)
        activityService := activity2.NewActivityService(activityActivityRepo, 
userCommon, activityCommon, tagCommonService, objService, commentCommonService, 
revisionService, metaCommonService, configService)
        activityController := controller.NewActivityController(activityService)
@@ -265,12 +265,12 @@ func initApplication(debug bool, serverConf *conf.Server, 
dbConf *data.Database,
        permissionController := controller.NewPermissionController(rankService)
        userPluginController := 
controller.NewUserPluginController(pluginCommonService)
        reviewController := controller.NewReviewController(reviewService, 
rankService, captchaService)
-       metaService := meta2.NewMetaService(metaCommonService, userCommon, 
answerRepo, questionRepo, eventQueueService)
+       metaService := meta2.NewMetaService(metaCommonService, userCommon, 
answerRepo, questionRepo, v2)
        metaController := controller.NewMetaController(metaService)
        badgeGroupRepo := badge_group.NewBadgeGroupRepo(dataData, uniqueIDRepo)
        eventRuleRepo := badge.NewEventRuleRepo(dataData)
-       badgeAwardService := badge2.NewBadgeAwardService(badgeAwardRepo, 
badgeRepo, userCommon, objService, notificationQueueService)
-       badgeEventService := badge2.NewBadgeEventService(dataData, 
eventQueueService, badgeRepo, eventRuleRepo, badgeAwardService)
+       badgeAwardService := badge2.NewBadgeAwardService(badgeAwardRepo, 
badgeRepo, userCommon, objService, v3)
+       badgeEventService := badge2.NewBadgeEventService(dataData, v2, 
badgeRepo, eventRuleRepo, badgeAwardService)
        badgeService := badge2.NewBadgeService(badgeRepo, badgeGroupRepo, 
badgeAwardRepo, badgeEventService, siteInfoCommonService)
        badgeController := controller.NewBadgeController(badgeService, 
badgeAwardService)
        controller_adminBadgeController := 
controller_admin.NewBadgeController(badgeService)
@@ -281,7 +281,7 @@ func initApplication(debug bool, serverConf *conf.Server, 
dbConf *data.Database,
        avatarMiddleware := middleware.NewAvatarMiddleware(serviceConf, 
uploaderService)
        shortIDMiddleware := 
middleware.NewShortIDMiddleware(siteInfoCommonService)
        templateRenderController := 
templaterender.NewTemplateRenderController(questionService, userService, 
tagService, answerService, commentService, siteInfoCommonService, questionRepo)
-       templateController := 
controller.NewTemplateController(templateRenderController, 
siteInfoCommonService, eventQueueService, userService, questionService)
+       templateController := 
controller.NewTemplateController(templateRenderController, 
siteInfoCommonService, v2, userService, questionService)
        templateRouter := router.NewTemplateRouter(templateController, 
templateRenderController, siteInfoController, authUserMiddleware)
        connectorController := 
controller.NewConnectorController(siteInfoCommonService, emailService, 
userExternalLoginService)
        userCenterLoginService := 
user_external_login2.NewUserCenterLoginService(userRepo, userCommon, 
userExternalLoginRepo, userActiveActivityRepo, siteInfoCommonService)
diff --git a/internal/base/queue/queue.go b/internal/base/queue/queue.go
new file mode 100644
index 00000000..ae23d341
--- /dev/null
+++ b/internal/base/queue/queue.go
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package queue
+
+import (
+       "context"
+       "sync"
+
+       "github.com/segmentfault/pacman/log"
+)
+
+// Queue is a generic message queue service that processes messages 
asynchronously.
+// It is thread-safe and supports graceful shutdown.
+type Queue[T any] struct {
+       name    string
+       queue   chan T
+       handler func(ctx context.Context, msg T) error
+       mu      sync.RWMutex
+       closed  bool
+       wg      sync.WaitGroup
+}
+
+// New creates a new queue with the given name and buffer size.
+func New[T any](name string, bufferSize int) *Queue[T] {
+       q := &Queue[T]{
+               name:  name,
+               queue: make(chan T, bufferSize),
+       }
+       q.startWorker()
+       return q
+}
+
+// Send enqueues a message to be processed asynchronously.
+// It will block if the queue is full.
+func (q *Queue[T]) Send(ctx context.Context, msg T) {
+       q.mu.RLock()
+       closed := q.closed
+       q.mu.RUnlock()
+
+       if closed {
+               log.Warnf("[%s] queue is closed, dropping message", q.name)
+               return
+       }
+
+       select {
+       case q.queue <- msg:
+               log.Debugf("[%s] enqueued message: %+v", q.name, msg)
+       case <-ctx.Done():
+               log.Warnf("[%s] context cancelled while sending message", 
q.name)
+       }
+}
+
+// RegisterHandler sets the handler function for processing messages.
+// This is thread-safe and can be called at any time.
+func (q *Queue[T]) RegisterHandler(handler func(ctx context.Context, msg T) 
error) {
+       q.mu.Lock()
+       defer q.mu.Unlock()
+       q.handler = handler
+}
+
+// Close gracefully shuts down the queue, waiting for pending messages to be 
processed.
+func (q *Queue[T]) Close() {
+       q.mu.Lock()
+       if q.closed {
+               q.mu.Unlock()
+               return
+       }
+       q.closed = true
+       q.mu.Unlock()
+
+       close(q.queue)
+       q.wg.Wait()
+       log.Infof("[%s] queue closed", q.name)
+}
+
+// startWorker starts the background goroutine that processes messages.
+func (q *Queue[T]) startWorker() {
+       q.wg.Add(1)
+       go func() {
+               defer q.wg.Done()
+               for msg := range q.queue {
+                       q.processMessage(msg)
+               }
+       }()
+}
+
+// processMessage handles a single message with proper synchronization.
+func (q *Queue[T]) processMessage(msg T) {
+       q.mu.RLock()
+       handler := q.handler
+       q.mu.RUnlock()
+
+       if handler == nil {
+               log.Warnf("[%s] no handler registered, dropping message: %+v", 
q.name, msg)
+               return
+       }
+
+       // Use background context for async processing
+       // TODO: Consider adding timeout or using a derived context
+       if err := handler(context.TODO(), msg); err != nil {
+               log.Errorf("[%s] handler error: %v", q.name, err)
+       }
+}
diff --git a/internal/base/queue/queue_test.go 
b/internal/base/queue/queue_test.go
new file mode 100644
index 00000000..79355fb7
--- /dev/null
+++ b/internal/base/queue/queue_test.go
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package queue
+
+import (
+       "context"
+       "sync"
+       "sync/atomic"
+       "testing"
+       "time"
+)
+
+type testMessage struct {
+       ID   int
+       Data string
+}
+
+func TestQueue_SendAndReceive(t *testing.T) {
+       q := New[*testMessage]("test", 10)
+       defer q.Close()
+
+       received := make(chan *testMessage, 1)
+       q.RegisterHandler(func(ctx context.Context, msg *testMessage) error {
+               received <- msg
+               return nil
+       })
+
+       msg := &testMessage{ID: 1, Data: "hello"}
+       q.Send(context.Background(), msg)
+
+       select {
+       case r := <-received:
+               if r.ID != msg.ID || r.Data != msg.Data {
+                       t.Errorf("received message mismatch: got %+v, want 
%+v", r, msg)
+               }
+       case <-time.After(time.Second):
+               t.Fatal("timeout waiting for message")
+       }
+}
+
+func TestQueue_MultipleMessages(t *testing.T) {
+       q := New[*testMessage]("test", 10)
+       defer q.Close()
+
+       var count atomic.Int32
+       var wg sync.WaitGroup
+       numMessages := 100
+       wg.Add(numMessages)
+
+       q.RegisterHandler(func(ctx context.Context, msg *testMessage) error {
+               count.Add(1)
+               wg.Done()
+               return nil
+       })
+
+       for i := range numMessages {
+               q.Send(context.Background(), &testMessage{ID: i})
+       }
+
+       done := make(chan struct{})
+       go func() {
+               wg.Wait()
+               close(done)
+       }()
+
+       select {
+       case <-done:
+               if int(count.Load()) != numMessages {
+                       t.Errorf("expected %d messages, got %d", numMessages, 
count.Load())
+               }
+       case <-time.After(5 * time.Second):
+               t.Fatalf("timeout: only received %d of %d messages", 
count.Load(), numMessages)
+       }
+}
+
+func TestQueue_NoHandlerDropsMessage(t *testing.T) {
+       q := New[*testMessage]("test", 10)
+       defer q.Close()
+
+       // Send without handler - should not panic
+       q.Send(context.Background(), &testMessage{ID: 1})
+
+       // Give time for the message to be processed (dropped)
+       time.Sleep(100 * time.Millisecond)
+}
+
+func TestQueue_RegisterHandlerAfterSend(t *testing.T) {
+       q := New[*testMessage]("test", 10)
+       defer q.Close()
+
+       received := make(chan *testMessage, 1)
+
+       // Send first
+       q.Send(context.Background(), &testMessage{ID: 1})
+
+       // Small delay then register handler
+       time.Sleep(50 * time.Millisecond)
+       q.RegisterHandler(func(ctx context.Context, msg *testMessage) error {
+               received <- msg
+               return nil
+       })
+
+       // Send another message that should be received
+       q.Send(context.Background(), &testMessage{ID: 2})
+
+       select {
+       case r := <-received:
+               if r.ID != 2 {
+                       // First message was dropped (no handler), second 
should be received
+                       t.Logf("received message ID: %d", r.ID)
+               }
+       case <-time.After(time.Second):
+               t.Fatal("timeout waiting for message")
+       }
+}
+
+func TestQueue_Close(t *testing.T) {
+       q := New[*testMessage]("test", 10)
+
+       var count atomic.Int32
+       q.RegisterHandler(func(ctx context.Context, msg *testMessage) error {
+               count.Add(1)
+               return nil
+       })
+
+       // Send some messages
+       for i := range 5 {
+               q.Send(context.Background(), &testMessage{ID: i})
+       }
+
+       // Close and wait
+       q.Close()
+
+       // All messages should have been processed
+       if count.Load() != 5 {
+               t.Errorf("expected 5 messages processed, got %d", count.Load())
+       }
+
+       // Sending after close should not panic
+       q.Send(context.Background(), &testMessage{ID: 99})
+}
+
+func TestQueue_ConcurrentSend(t *testing.T) {
+       q := New[*testMessage]("test", 100)
+       defer q.Close()
+
+       var count atomic.Int32
+       q.RegisterHandler(func(ctx context.Context, msg *testMessage) error {
+               count.Add(1)
+               return nil
+       })
+
+       var wg sync.WaitGroup
+       numGoroutines := 10
+       messagesPerGoroutine := 100
+
+       for i := range numGoroutines {
+               wg.Add(1)
+               go func(id int) {
+                       defer wg.Done()
+                       for j := range messagesPerGoroutine {
+                               q.Send(context.Background(), &testMessage{ID: 
id*1000 + j})
+                       }
+               }(i)
+       }
+
+       wg.Wait()
+
+       // Wait for processing
+       time.Sleep(500 * time.Millisecond)
+
+       expected := int32(numGoroutines * messagesPerGoroutine)
+       if count.Load() != expected {
+               t.Errorf("expected %d messages, got %d", expected, count.Load())
+       }
+}
+
+func TestQueue_ConcurrentRegisterHandler(t *testing.T) {
+       q := New[*testMessage]("test", 10)
+       defer q.Close()
+
+       // Concurrently register handlers - should not race
+       var wg sync.WaitGroup
+       for range 10 {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       q.RegisterHandler(func(ctx context.Context, msg 
*testMessage) error {
+                               return nil
+                       })
+               }()
+       }
+       wg.Wait()
+}
diff --git a/internal/controller/template_controller.go 
b/internal/controller/template_controller.go
index 257b02fa..e6b94f4f 100644
--- a/internal/controller/template_controller.go
+++ b/internal/controller/template_controller.go
@@ -32,7 +32,7 @@ import (
        "github.com/apache/answer/internal/base/middleware"
        "github.com/apache/answer/internal/base/pager"
        "github.com/apache/answer/internal/service/content"
-       "github.com/apache/answer/internal/service/event_queue"
+       "github.com/apache/answer/internal/service/eventqueue"
        "github.com/apache/answer/plugin"
 
        "github.com/apache/answer/internal/base/constant"
@@ -59,7 +59,7 @@ type TemplateController struct {
        cssPath                  string
        templateRenderController *templaterender.TemplateRenderController
        siteInfoService          siteinfo_common.SiteInfoCommonService
-       eventQueueService        event_queue.EventQueueService
+       eventQueueService        eventqueue.Service
        userService              *content.UserService
        questionService          *content.QuestionService
 }
@@ -68,7 +68,7 @@ type TemplateController struct {
 func NewTemplateController(
        templateRenderController *templaterender.TemplateRenderController,
        siteInfoService siteinfo_common.SiteInfoCommonService,
-       eventQueueService event_queue.EventQueueService,
+       eventQueueService eventqueue.Service,
        userService *content.UserService,
        questionService *content.QuestionService,
 ) *TemplateController {
diff --git a/internal/repo/activity/answer_repo.go 
b/internal/repo/activity/answer_repo.go
index 4aca874a..96813f50 100644
--- a/internal/repo/activity/answer_repo.go
+++ b/internal/repo/activity/answer_repo.go
@@ -34,7 +34,7 @@ import (
        "github.com/apache/answer/internal/schema"
        "github.com/apache/answer/internal/service/activity"
        "github.com/apache/answer/internal/service/activity_common"
-       "github.com/apache/answer/internal/service/notice_queue"
+       "github.com/apache/answer/internal/service/noticequeue"
        "github.com/apache/answer/internal/service/rank"
        "github.com/apache/answer/pkg/converter"
        "github.com/segmentfault/pacman/errors"
@@ -46,7 +46,7 @@ type AnswerActivityRepo struct {
        data                     *data.Data
        activityRepo             activity_common.ActivityRepo
        userRankRepo             rank.UserRankRepo
-       notificationQueueService notice_queue.NotificationQueueService
+       notificationQueueService noticequeue.Service
 }
 
 // NewAnswerActivityRepo new repository
@@ -54,7 +54,7 @@ func NewAnswerActivityRepo(
        data *data.Data,
        activityRepo activity_common.ActivityRepo,
        userRankRepo rank.UserRankRepo,
-       notificationQueueService notice_queue.NotificationQueueService,
+       notificationQueueService noticequeue.Service,
 ) activity.AnswerActivityRepo {
        return &AnswerActivityRepo{
                data:                     data,
diff --git a/internal/repo/activity/vote_repo.go 
b/internal/repo/activity/vote_repo.go
index f2d2be5f..389ae18d 100644
--- a/internal/repo/activity/vote_repo.go
+++ b/internal/repo/activity/vote_repo.go
@@ -28,7 +28,7 @@ import (
        "github.com/segmentfault/pacman/log"
 
        "github.com/apache/answer/internal/base/constant"
-       "github.com/apache/answer/internal/service/notice_queue"
+       "github.com/apache/answer/internal/service/noticequeue"
        "github.com/apache/answer/pkg/converter"
 
        "github.com/apache/answer/internal/base/pager"
@@ -51,7 +51,7 @@ type VoteRepo struct {
        data                     *data.Data
        activityRepo             activity_common.ActivityRepo
        userRankRepo             rank.UserRankRepo
-       notificationQueueService notice_queue.NotificationQueueService
+       notificationQueueService noticequeue.Service
 }
 
 // NewVoteRepo new repository
@@ -59,7 +59,7 @@ func NewVoteRepo(
        data *data.Data,
        activityRepo activity_common.ActivityRepo,
        userRankRepo rank.UserRankRepo,
-       notificationQueueService notice_queue.NotificationQueueService,
+       notificationQueueService noticequeue.Service,
 ) content.VoteRepo {
        return &VoteRepo{
                data:                     data,
diff --git a/internal/service/activity_common/activity.go 
b/internal/service/activity_common/activity.go
index 74f73a75..3d2efd6a 100644
--- a/internal/service/activity_common/activity.go
+++ b/internal/service/activity_common/activity.go
@@ -25,7 +25,7 @@ import (
 
        "github.com/apache/answer/internal/entity"
        "github.com/apache/answer/internal/schema"
-       "github.com/apache/answer/internal/service/activity_queue"
+       "github.com/apache/answer/internal/service/activityqueue"
        "github.com/apache/answer/pkg/converter"
        "github.com/apache/answer/pkg/uid"
        "github.com/segmentfault/pacman/log"
@@ -49,13 +49,13 @@ type ActivityRepo interface {
 
 type ActivityCommon struct {
        activityRepo         ActivityRepo
-       activityQueueService activity_queue.ActivityQueueService
+       activityQueueService activityqueue.Service
 }
 
 // NewActivityCommon new activity common
 func NewActivityCommon(
        activityRepo ActivityRepo,
-       activityQueueService activity_queue.ActivityQueueService,
+       activityQueueService activityqueue.Service,
 ) *ActivityCommon {
        activity := &ActivityCommon{
                activityRepo:         activityRepo,
diff --git a/internal/service/activity_queue/activity_queue.go 
b/internal/service/activity_queue/activity_queue.go
deleted file mode 100644
index 7b8c1e3b..00000000
--- a/internal/service/activity_queue/activity_queue.go
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package activity_queue
-
-import (
-       "context"
-
-       "github.com/apache/answer/internal/schema"
-       "github.com/segmentfault/pacman/log"
-)
-
-type ActivityQueueService interface {
-       Send(ctx context.Context, msg *schema.ActivityMsg)
-       RegisterHandler(handler func(ctx context.Context, msg 
*schema.ActivityMsg) error)
-}
-
-type activityQueueService struct {
-       Queue   chan *schema.ActivityMsg
-       Handler func(ctx context.Context, msg *schema.ActivityMsg) error
-}
-
-func (ns *activityQueueService) Send(ctx context.Context, msg 
*schema.ActivityMsg) {
-       ns.Queue <- msg
-}
-
-func (ns *activityQueueService) RegisterHandler(
-       handler func(ctx context.Context, msg *schema.ActivityMsg) error) {
-       ns.Handler = handler
-}
-
-func (ns *activityQueueService) working() {
-       go func() {
-               for msg := range ns.Queue {
-                       log.Debugf("received activity %+v", msg)
-                       if ns.Handler == nil {
-                               log.Warnf("no handler for activity")
-                               continue
-                       }
-                       if err := ns.Handler(context.Background(), msg); err != 
nil {
-                               log.Error(err)
-                       }
-               }
-       }()
-}
-
-// NewActivityQueueService create a new activity queue service
-func NewActivityQueueService() ActivityQueueService {
-       ns := &activityQueueService{}
-       ns.Queue = make(chan *schema.ActivityMsg, 128)
-       ns.working()
-       return ns
-}
diff --git a/internal/service/activityqueue/activity_queue.go 
b/internal/service/activityqueue/activity_queue.go
new file mode 100644
index 00000000..d32caf5e
--- /dev/null
+++ b/internal/service/activityqueue/activity_queue.go
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package activityqueue
+
+import (
+       "github.com/apache/answer/internal/base/queue"
+       "github.com/apache/answer/internal/schema"
+)
+
+type Service = *queue.Queue[*schema.ActivityMsg]
+
+func NewService() Service {
+       return queue.New[*schema.ActivityMsg]("activity", 128)
+}
diff --git a/internal/service/badge/badge_award_service.go 
b/internal/service/badge/badge_award_service.go
index 982c1d1a..0799b87c 100644
--- a/internal/service/badge/badge_award_service.go
+++ b/internal/service/badge/badge_award_service.go
@@ -28,7 +28,7 @@ import (
        "github.com/apache/answer/internal/base/translator"
        "github.com/apache/answer/internal/entity"
        "github.com/apache/answer/internal/schema"
-       "github.com/apache/answer/internal/service/notice_queue"
+       "github.com/apache/answer/internal/service/noticequeue"
        "github.com/apache/answer/internal/service/object_info"
        usercommon "github.com/apache/answer/internal/service/user_common"
        "github.com/apache/answer/pkg/uid"
@@ -62,7 +62,7 @@ type BadgeAwardService struct {
        badgeRepo                BadgeRepo
        userCommon               *usercommon.UserCommon
        objectInfoService        *object_info.ObjService
-       notificationQueueService notice_queue.NotificationQueueService
+       notificationQueueService noticequeue.Service
 }
 
 func NewBadgeAwardService(
@@ -70,7 +70,7 @@ func NewBadgeAwardService(
        badgeRepo BadgeRepo,
        userCommon *usercommon.UserCommon,
        objectInfoService *object_info.ObjService,
-       notificationQueueService notice_queue.NotificationQueueService,
+       notificationQueueService noticequeue.Service,
 ) *BadgeAwardService {
        return &BadgeAwardService{
                badgeAwardRepo:           badgeAwardRepo,
diff --git a/internal/service/badge/badge_event_handler.go 
b/internal/service/badge/badge_event_handler.go
index 24cabf29..0a9a84c0 100644
--- a/internal/service/badge/badge_event_handler.go
+++ b/internal/service/badge/badge_event_handler.go
@@ -25,13 +25,13 @@ import (
        "github.com/apache/answer/internal/base/data"
        "github.com/apache/answer/internal/entity"
        "github.com/apache/answer/internal/schema"
-       "github.com/apache/answer/internal/service/event_queue"
+       "github.com/apache/answer/internal/service/eventqueue"
        "github.com/segmentfault/pacman/log"
 )
 
 type BadgeEventService struct {
        data              *data.Data
-       eventQueueService event_queue.EventQueueService
+       eventQueueService eventqueue.Service
        badgeRepo         BadgeRepo
        eventRuleRepo     EventRuleRepo
        badgeAwardService *BadgeAwardService
@@ -45,7 +45,7 @@ type EventRuleRepo interface {
 
 func NewBadgeEventService(
        data *data.Data,
-       eventQueueService event_queue.EventQueueService,
+       eventQueueService eventqueue.Service,
        badgeRepo BadgeRepo,
        eventRuleRepo EventRuleRepo,
        badgeAwardService *BadgeAwardService,
diff --git a/internal/service/comment/comment_service.go 
b/internal/service/comment/comment_service.go
index dc599e6d..30ff43c6 100644
--- a/internal/service/comment/comment_service.go
+++ b/internal/service/comment/comment_service.go
@@ -22,7 +22,7 @@ package comment
 import (
        "context"
 
-       "github.com/apache/answer/internal/service/event_queue"
+       "github.com/apache/answer/internal/service/eventqueue"
        "github.com/apache/answer/internal/service/review"
 
        "time"
@@ -33,10 +33,10 @@ import (
        "github.com/apache/answer/internal/entity"
        "github.com/apache/answer/internal/schema"
        "github.com/apache/answer/internal/service/activity_common"
-       "github.com/apache/answer/internal/service/activity_queue"
+       "github.com/apache/answer/internal/service/activityqueue"
        "github.com/apache/answer/internal/service/comment_common"
        "github.com/apache/answer/internal/service/export"
-       "github.com/apache/answer/internal/service/notice_queue"
+       "github.com/apache/answer/internal/service/noticequeue"
        "github.com/apache/answer/internal/service/object_info"
        "github.com/apache/answer/internal/service/permission"
        usercommon "github.com/apache/answer/internal/service/user_common"
@@ -88,10 +88,10 @@ type CommentService struct {
        objectInfoService                *object_info.ObjService
        emailService                     *export.EmailService
        userRepo                         usercommon.UserRepo
-       notificationQueueService         notice_queue.NotificationQueueService
-       externalNotificationQueueService 
notice_queue.ExternalNotificationQueueService
-       activityQueueService             activity_queue.ActivityQueueService
-       eventQueueService                event_queue.EventQueueService
+       notificationQueueService         noticequeue.Service
+       externalNotificationQueueService noticequeue.ExternalService
+       activityQueueService             activityqueue.Service
+       eventQueueService                eventqueue.Service
        reviewService                    *review.ReviewService
 }
 
@@ -104,10 +104,10 @@ func NewCommentService(
        voteCommon activity_common.VoteRepo,
        emailService *export.EmailService,
        userRepo usercommon.UserRepo,
-       notificationQueueService notice_queue.NotificationQueueService,
-       externalNotificationQueueService 
notice_queue.ExternalNotificationQueueService,
-       activityQueueService activity_queue.ActivityQueueService,
-       eventQueueService event_queue.EventQueueService,
+       notificationQueueService noticequeue.Service,
+       externalNotificationQueueService noticequeue.ExternalService,
+       activityQueueService activityqueue.Service,
+       eventQueueService eventqueue.Service,
        reviewService *review.ReviewService,
 ) *CommentService {
        return &CommentService{
diff --git a/internal/service/content/answer_service.go 
b/internal/service/content/answer_service.go
index d3aab20b..2ad87517 100644
--- a/internal/service/content/answer_service.go
+++ b/internal/service/content/answer_service.go
@@ -24,7 +24,7 @@ import (
        "encoding/json"
        "time"
 
-       "github.com/apache/answer/internal/service/event_queue"
+       "github.com/apache/answer/internal/service/eventqueue"
 
        "github.com/apache/answer/internal/base/constant"
        "github.com/apache/answer/internal/base/reason"
@@ -32,11 +32,11 @@ import (
        "github.com/apache/answer/internal/schema"
        "github.com/apache/answer/internal/service/activity"
        "github.com/apache/answer/internal/service/activity_common"
-       "github.com/apache/answer/internal/service/activity_queue"
+       "github.com/apache/answer/internal/service/activityqueue"
        answercommon "github.com/apache/answer/internal/service/answer_common"
        collectioncommon 
"github.com/apache/answer/internal/service/collection_common"
        "github.com/apache/answer/internal/service/export"
-       "github.com/apache/answer/internal/service/notice_queue"
+       "github.com/apache/answer/internal/service/noticequeue"
        "github.com/apache/answer/internal/service/permission"
        questioncommon 
"github.com/apache/answer/internal/service/question_common"
        "github.com/apache/answer/internal/service/review"
@@ -65,11 +65,11 @@ type AnswerService struct {
        voteRepo                         activity_common.VoteRepo
        emailService                     *export.EmailService
        roleService                      *role.UserRoleRelService
-       notificationQueueService         notice_queue.NotificationQueueService
-       externalNotificationQueueService 
notice_queue.ExternalNotificationQueueService
-       activityQueueService             activity_queue.ActivityQueueService
+       notificationQueueService         noticequeue.Service
+       externalNotificationQueueService noticequeue.ExternalService
+       activityQueueService             activityqueue.Service
        reviewService                    *review.ReviewService
-       eventQueueService                event_queue.EventQueueService
+       eventQueueService                eventqueue.Service
 }
 
 func NewAnswerService(
@@ -85,11 +85,11 @@ func NewAnswerService(
        voteRepo activity_common.VoteRepo,
        emailService *export.EmailService,
        roleService *role.UserRoleRelService,
-       notificationQueueService notice_queue.NotificationQueueService,
-       externalNotificationQueueService 
notice_queue.ExternalNotificationQueueService,
-       activityQueueService activity_queue.ActivityQueueService,
+       notificationQueueService noticequeue.Service,
+       externalNotificationQueueService noticequeue.ExternalService,
+       activityQueueService activityqueue.Service,
        reviewService *review.ReviewService,
-       eventQueueService event_queue.EventQueueService,
+       eventQueueService eventqueue.Service,
 ) *AnswerService {
        return &AnswerService{
                answerRepo:                       answerRepo,
diff --git a/internal/service/content/question_service.go 
b/internal/service/content/question_service.go
index b8372a72..bc3ac0bb 100644
--- a/internal/service/content/question_service.go
+++ b/internal/service/content/question_service.go
@@ -25,7 +25,7 @@ import (
        "strings"
        "time"
 
-       "github.com/apache/answer/internal/service/event_queue"
+       "github.com/apache/answer/internal/service/eventqueue"
        "github.com/apache/answer/plugin"
 
        "github.com/apache/answer/internal/base/constant"
@@ -38,13 +38,13 @@ import (
        "github.com/apache/answer/internal/schema"
        "github.com/apache/answer/internal/service/activity"
        "github.com/apache/answer/internal/service/activity_common"
-       "github.com/apache/answer/internal/service/activity_queue"
+       "github.com/apache/answer/internal/service/activityqueue"
        answercommon "github.com/apache/answer/internal/service/answer_common"
        collectioncommon 
"github.com/apache/answer/internal/service/collection_common"
        "github.com/apache/answer/internal/service/config"
        "github.com/apache/answer/internal/service/export"
        metacommon "github.com/apache/answer/internal/service/meta_common"
-       "github.com/apache/answer/internal/service/notice_queue"
+       "github.com/apache/answer/internal/service/noticequeue"
        "github.com/apache/answer/internal/service/notification"
        "github.com/apache/answer/internal/service/permission"
        questioncommon 
"github.com/apache/answer/internal/service/question_common"
@@ -84,14 +84,14 @@ type QuestionService struct {
        collectionCommon                 *collectioncommon.CollectionCommon
        answerActivityService            *activity.AnswerActivityService
        emailService                     *export.EmailService
-       notificationQueueService         notice_queue.NotificationQueueService
-       externalNotificationQueueService 
notice_queue.ExternalNotificationQueueService
-       activityQueueService             activity_queue.ActivityQueueService
+       notificationQueueService         noticequeue.Service
+       externalNotificationQueueService noticequeue.ExternalService
+       activityQueueService             activityqueue.Service
        siteInfoService                  siteinfo_common.SiteInfoCommonService
        newQuestionNotificationService   
*notification.ExternalNotificationService
        reviewService                    *review.ReviewService
        configService                    *config.ConfigService
-       eventQueueService                event_queue.EventQueueService
+       eventQueueService                eventqueue.Service
        reviewRepo                       review.ReviewRepo
 }
 
@@ -110,14 +110,14 @@ func NewQuestionService(
        collectionCommon *collectioncommon.CollectionCommon,
        answerActivityService *activity.AnswerActivityService,
        emailService *export.EmailService,
-       notificationQueueService notice_queue.NotificationQueueService,
-       externalNotificationQueueService 
notice_queue.ExternalNotificationQueueService,
-       activityQueueService activity_queue.ActivityQueueService,
+       notificationQueueService noticequeue.Service,
+       externalNotificationQueueService noticequeue.ExternalService,
+       activityQueueService activityqueue.Service,
        siteInfoService siteinfo_common.SiteInfoCommonService,
        newQuestionNotificationService 
*notification.ExternalNotificationService,
        reviewService *review.ReviewService,
        configService *config.ConfigService,
-       eventQueueService event_queue.EventQueueService,
+       eventQueueService eventqueue.Service,
        reviewRepo review.ReviewRepo,
 ) *QuestionService {
        return &QuestionService{
diff --git a/internal/service/content/revision_service.go 
b/internal/service/content/revision_service.go
index 4ac08e76..13ec65b7 100644
--- a/internal/service/content/revision_service.go
+++ b/internal/service/content/revision_service.go
@@ -32,9 +32,9 @@ import (
        "github.com/apache/answer/internal/entity"
        "github.com/apache/answer/internal/schema"
        "github.com/apache/answer/internal/service/activity"
-       "github.com/apache/answer/internal/service/activity_queue"
+       "github.com/apache/answer/internal/service/activityqueue"
        answercommon "github.com/apache/answer/internal/service/answer_common"
-       "github.com/apache/answer/internal/service/notice_queue"
+       "github.com/apache/answer/internal/service/noticequeue"
        "github.com/apache/answer/internal/service/object_info"
        questioncommon 
"github.com/apache/answer/internal/service/question_common"
        "github.com/apache/answer/internal/service/report_common"
@@ -62,8 +62,8 @@ type RevisionService struct {
        answerRepo               answercommon.AnswerRepo
        tagRepo                  tag_common.TagRepo
        tagCommon                *tag_common.TagCommonService
-       notificationQueueService notice_queue.NotificationQueueService
-       activityQueueService     activity_queue.ActivityQueueService
+       notificationQueueService noticequeue.Service
+       activityQueueService     activityqueue.Service
        reportRepo               report_common.ReportRepo
        reviewService            *review.ReviewService
        reviewActivity           activity.ReviewActivityRepo
@@ -79,8 +79,8 @@ func NewRevisionService(
        answerRepo answercommon.AnswerRepo,
        tagRepo tag_common.TagRepo,
        tagCommon *tag_common.TagCommonService,
-       notificationQueueService notice_queue.NotificationQueueService,
-       activityQueueService activity_queue.ActivityQueueService,
+       notificationQueueService noticequeue.Service,
+       activityQueueService activityqueue.Service,
        reportRepo report_common.ReportRepo,
        reviewService *review.ReviewService,
        reviewActivity activity.ReviewActivityRepo,
diff --git a/internal/service/content/user_service.go 
b/internal/service/content/user_service.go
index e9cc3578..711d6caa 100644
--- a/internal/service/content/user_service.go
+++ b/internal/service/content/user_service.go
@@ -25,7 +25,7 @@ import (
        "fmt"
        "time"
 
-       "github.com/apache/answer/internal/service/event_queue"
+       "github.com/apache/answer/internal/service/eventqueue"
        "github.com/apache/answer/pkg/token"
 
        "github.com/apache/answer/internal/base/constant"
@@ -68,7 +68,7 @@ type UserService struct {
        userNotificationConfigRepo    
user_notification_config.UserNotificationConfigRepo
        userNotificationConfigService 
*user_notification_config.UserNotificationConfigService
        questionService               *questioncommon.QuestionCommon
-       eventQueueService             event_queue.EventQueueService
+       eventQueueService             eventqueue.Service
        fileRecordService             *file_record.FileRecordService
 }
 
@@ -84,7 +84,7 @@ func NewUserService(userRepo usercommon.UserRepo,
        userNotificationConfigRepo 
user_notification_config.UserNotificationConfigRepo,
        userNotificationConfigService 
*user_notification_config.UserNotificationConfigService,
        questionService *questioncommon.QuestionCommon,
-       eventQueueService event_queue.EventQueueService,
+       eventQueueService eventqueue.Service,
        fileRecordService *file_record.FileRecordService,
 ) *UserService {
        return &UserService{
diff --git a/internal/service/content/vote_service.go 
b/internal/service/content/vote_service.go
index aa615049..1f74769f 100644
--- a/internal/service/content/vote_service.go
+++ b/internal/service/content/vote_service.go
@@ -24,7 +24,7 @@ import (
        "fmt"
        "strings"
 
-       "github.com/apache/answer/internal/service/event_queue"
+       "github.com/apache/answer/internal/service/eventqueue"
 
        "github.com/apache/answer/internal/base/constant"
        "github.com/apache/answer/internal/base/handler"
@@ -62,7 +62,7 @@ type VoteService struct {
        answerRepo        answercommon.AnswerRepo
        commentCommonRepo comment_common.CommentCommonRepo
        objectService     *object_info.ObjService
-       eventQueueService event_queue.EventQueueService
+       eventQueueService eventqueue.Service
 }
 
 func NewVoteService(
@@ -72,7 +72,7 @@ func NewVoteService(
        answerRepo answercommon.AnswerRepo,
        commentCommonRepo comment_common.CommentCommonRepo,
        objectService *object_info.ObjService,
-       eventQueueService event_queue.EventQueueService,
+       eventQueueService eventqueue.Service,
 ) *VoteService {
        return &VoteService{
                voteRepo:          voteRepo,
diff --git a/internal/service/event_queue/event_queue.go 
b/internal/service/event_queue/event_queue.go
deleted file mode 100644
index 77dc302b..00000000
--- a/internal/service/event_queue/event_queue.go
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package event_queue
-
-import (
-       "context"
-
-       "github.com/apache/answer/internal/schema"
-       "github.com/segmentfault/pacman/log"
-)
-
-type EventQueueService interface {
-       Send(ctx context.Context, msg *schema.EventMsg)
-       RegisterHandler(handler func(ctx context.Context, msg *schema.EventMsg) 
error)
-}
-
-type eventQueueService struct {
-       Queue   chan *schema.EventMsg
-       Handler func(ctx context.Context, msg *schema.EventMsg) error
-}
-
-func (ns *eventQueueService) Send(ctx context.Context, msg *schema.EventMsg) {
-       ns.Queue <- msg
-}
-
-func (ns *eventQueueService) RegisterHandler(
-       handler func(ctx context.Context, msg *schema.EventMsg) error) {
-       ns.Handler = handler
-}
-
-func (ns *eventQueueService) working() {
-       go func() {
-               for msg := range ns.Queue {
-                       log.Debugf("received badge %+v", msg)
-                       if ns.Handler == nil {
-                               log.Warnf("no handler for badge")
-                               continue
-                       }
-                       if err := ns.Handler(context.Background(), msg); err != 
nil {
-                               log.Error(err)
-                       }
-               }
-       }()
-}
-
-// NewEventQueueService create a new badge queue service
-func NewEventQueueService() EventQueueService {
-       ns := &eventQueueService{}
-       ns.Queue = make(chan *schema.EventMsg, 128)
-       ns.working()
-       return ns
-}
diff --git a/internal/service/eventqueue/event_queue.go 
b/internal/service/eventqueue/event_queue.go
new file mode 100644
index 00000000..8d3a2239
--- /dev/null
+++ b/internal/service/eventqueue/event_queue.go
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package eventqueue
+
+import (
+       "github.com/apache/answer/internal/base/queue"
+       "github.com/apache/answer/internal/schema"
+)
+
+type Service = *queue.Queue[*schema.EventMsg]
+
+func NewService() Service {
+       return queue.New[*schema.EventMsg]("event", 128)
+}
diff --git a/internal/service/meta/meta_service.go 
b/internal/service/meta/meta_service.go
index c1ca7c61..e48e8f46 100644
--- a/internal/service/meta/meta_service.go
+++ b/internal/service/meta/meta_service.go
@@ -26,7 +26,7 @@ import (
        "strconv"
        "strings"
 
-       "github.com/apache/answer/internal/service/event_queue"
+       "github.com/apache/answer/internal/service/eventqueue"
 
        "github.com/apache/answer/internal/base/constant"
        "github.com/apache/answer/internal/base/handler"
@@ -48,7 +48,7 @@ type MetaService struct {
        userCommon        *usercommon.UserCommon
        questionRepo      questioncommon.QuestionRepo
        answerRepo        answercommon.AnswerRepo
-       eventQueueService event_queue.EventQueueService
+       eventQueueService eventqueue.Service
 }
 
 func NewMetaService(
@@ -56,7 +56,7 @@ func NewMetaService(
        userCommon *usercommon.UserCommon,
        answerRepo answercommon.AnswerRepo,
        questionRepo questioncommon.QuestionRepo,
-       eventQueueService event_queue.EventQueueService,
+       eventQueueService eventqueue.Service,
 ) *MetaService {
        return &MetaService{
                metaCommonService: metaCommonService,
diff --git a/internal/service/notice_queue/external_notification_queue.go 
b/internal/service/notice_queue/external_notification_queue.go
deleted file mode 100644
index 6322a77e..00000000
--- a/internal/service/notice_queue/external_notification_queue.go
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package notice_queue
-
-import (
-       "context"
-
-       "github.com/apache/answer/internal/schema"
-       "github.com/segmentfault/pacman/log"
-)
-
-type ExternalNotificationQueueService interface {
-       Send(ctx context.Context, msg *schema.ExternalNotificationMsg)
-       RegisterHandler(handler func(ctx context.Context, msg 
*schema.ExternalNotificationMsg) error)
-}
-
-type externalNotificationQueueService struct {
-       Queue   chan *schema.ExternalNotificationMsg
-       Handler func(ctx context.Context, msg *schema.ExternalNotificationMsg) 
error
-}
-
-func (ns *externalNotificationQueueService) Send(ctx context.Context, msg 
*schema.ExternalNotificationMsg) {
-       ns.Queue <- msg
-}
-
-func (ns *externalNotificationQueueService) RegisterHandler(
-       handler func(ctx context.Context, msg *schema.ExternalNotificationMsg) 
error) {
-       ns.Handler = handler
-}
-
-func (ns *externalNotificationQueueService) working() {
-       go func() {
-               for msg := range ns.Queue {
-                       log.Debugf("received notification %+v", msg)
-                       if ns.Handler == nil {
-                               log.Warnf("no handler for notification")
-                               continue
-                       }
-                       if err := ns.Handler(context.Background(), msg); err != 
nil {
-                               log.Error(err)
-                       }
-               }
-       }()
-}
-
-// NewNewQuestionNotificationQueueService create a new notification queue 
service
-func NewNewQuestionNotificationQueueService() ExternalNotificationQueueService 
{
-       ns := &externalNotificationQueueService{}
-       ns.Queue = make(chan *schema.ExternalNotificationMsg, 128)
-       ns.working()
-       return ns
-}
diff --git a/internal/service/notice_queue/notice_queue.go 
b/internal/service/notice_queue/notice_queue.go
deleted file mode 100644
index 22b733e3..00000000
--- a/internal/service/notice_queue/notice_queue.go
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package notice_queue
-
-import (
-       "context"
-
-       "github.com/apache/answer/internal/schema"
-       "github.com/segmentfault/pacman/log"
-)
-
-type NotificationQueueService interface {
-       Send(ctx context.Context, msg *schema.NotificationMsg)
-       RegisterHandler(handler func(ctx context.Context, msg 
*schema.NotificationMsg) error)
-}
-
-type notificationQueueService struct {
-       Queue   chan *schema.NotificationMsg
-       Handler func(ctx context.Context, msg *schema.NotificationMsg) error
-}
-
-func (ns *notificationQueueService) Send(ctx context.Context, msg 
*schema.NotificationMsg) {
-       ns.Queue <- msg
-}
-
-func (ns *notificationQueueService) RegisterHandler(
-       handler func(ctx context.Context, msg *schema.NotificationMsg) error) {
-       ns.Handler = handler
-}
-
-func (ns *notificationQueueService) working() {
-       go func() {
-               for msg := range ns.Queue {
-                       log.Debugf("received notification %+v", msg)
-                       if ns.Handler == nil {
-                               log.Warnf("no handler for notification")
-                               continue
-                       }
-                       if err := ns.Handler(context.Background(), msg); err != 
nil {
-                               log.Error(err)
-                       }
-               }
-       }()
-}
-
-// NewNotificationQueueService create a new notification queue service
-func NewNotificationQueueService() NotificationQueueService {
-       ns := &notificationQueueService{}
-       ns.Queue = make(chan *schema.NotificationMsg, 128)
-       ns.working()
-       return ns
-}
diff --git a/internal/service/noticequeue/notice_queue.go 
b/internal/service/noticequeue/notice_queue.go
new file mode 100644
index 00000000..138f9ce6
--- /dev/null
+++ b/internal/service/noticequeue/notice_queue.go
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package noticequeue
+
+import (
+       "github.com/apache/answer/internal/base/queue"
+       "github.com/apache/answer/internal/schema"
+)
+
+type Service = *queue.Queue[*schema.NotificationMsg]
+
+func NewService() Service {
+       return queue.New[*schema.NotificationMsg]("notification", 128)
+}
+
+type ExternalService = *queue.Queue[*schema.ExternalNotificationMsg]
+
+func NewExternalService() ExternalService {
+       return 
queue.New[*schema.ExternalNotificationMsg]("external_notification", 128)
+}
diff --git a/internal/service/notification/external_notification.go 
b/internal/service/notification/external_notification.go
index d6bdd2fb..425a8c2b 100644
--- a/internal/service/notification/external_notification.go
+++ b/internal/service/notification/external_notification.go
@@ -28,7 +28,7 @@ import (
        "github.com/apache/answer/internal/schema"
        "github.com/apache/answer/internal/service/activity_common"
        "github.com/apache/answer/internal/service/export"
-       "github.com/apache/answer/internal/service/notice_queue"
+       "github.com/apache/answer/internal/service/noticequeue"
        "github.com/apache/answer/internal/service/siteinfo_common"
        usercommon "github.com/apache/answer/internal/service/user_common"
        "github.com/apache/answer/internal/service/user_external_login"
@@ -42,7 +42,7 @@ type ExternalNotificationService struct {
        followRepo                 activity_common.FollowRepo
        emailService               *export.EmailService
        userRepo                   usercommon.UserRepo
-       notificationQueueService   notice_queue.ExternalNotificationQueueService
+       notificationQueueService   noticequeue.ExternalService
        userExternalLoginRepo      user_external_login.UserExternalLoginRepo
        siteInfoService            siteinfo_common.SiteInfoCommonService
 }
@@ -53,7 +53,7 @@ func NewExternalNotificationService(
        followRepo activity_common.FollowRepo,
        emailService *export.EmailService,
        userRepo usercommon.UserRepo,
-       notificationQueueService notice_queue.ExternalNotificationQueueService,
+       notificationQueueService noticequeue.ExternalService,
        userExternalLoginRepo user_external_login.UserExternalLoginRepo,
        siteInfoService siteinfo_common.SiteInfoCommonService,
 ) *ExternalNotificationService {
diff --git a/internal/service/notification_common/notification.go 
b/internal/service/notification_common/notification.go
index 94ce86b1..aa3f4106 100644
--- a/internal/service/notification_common/notification.go
+++ b/internal/service/notification_common/notification.go
@@ -35,7 +35,7 @@ import (
        "github.com/apache/answer/internal/entity"
        "github.com/apache/answer/internal/schema"
        "github.com/apache/answer/internal/service/activity_common"
-       "github.com/apache/answer/internal/service/notice_queue"
+       "github.com/apache/answer/internal/service/noticequeue"
        "github.com/apache/answer/internal/service/object_info"
        usercommon "github.com/apache/answer/internal/service/user_common"
        "github.com/apache/answer/pkg/uid"
@@ -66,7 +66,7 @@ type NotificationCommon struct {
        followRepo               activity_common.FollowRepo
        userCommon               *usercommon.UserCommon
        objectInfoService        *object_info.ObjService
-       notificationQueueService notice_queue.NotificationQueueService
+       notificationQueueService noticequeue.Service
        userExternalLoginRepo    user_external_login.UserExternalLoginRepo
        siteInfoService          siteinfo_common.SiteInfoCommonService
 }
@@ -78,7 +78,7 @@ func NewNotificationCommon(
        activityRepo activity_common.ActivityRepo,
        followRepo activity_common.FollowRepo,
        objectInfoService *object_info.ObjService,
-       notificationQueueService notice_queue.NotificationQueueService,
+       notificationQueueService noticequeue.Service,
        userExternalLoginRepo user_external_login.UserExternalLoginRepo,
        siteInfoService siteinfo_common.SiteInfoCommonService,
 ) *NotificationCommon {
diff --git a/internal/service/provider.go b/internal/service/provider.go
index 65535f41..f6d95470 100644
--- a/internal/service/provider.go
+++ b/internal/service/provider.go
@@ -23,7 +23,7 @@ import (
        "github.com/apache/answer/internal/service/action"
        "github.com/apache/answer/internal/service/activity"
        "github.com/apache/answer/internal/service/activity_common"
-       "github.com/apache/answer/internal/service/activity_queue"
+       "github.com/apache/answer/internal/service/activityqueue"
        answercommon "github.com/apache/answer/internal/service/answer_common"
        "github.com/apache/answer/internal/service/auth"
        "github.com/apache/answer/internal/service/badge"
@@ -34,14 +34,14 @@ import (
        "github.com/apache/answer/internal/service/config"
        "github.com/apache/answer/internal/service/content"
        "github.com/apache/answer/internal/service/dashboard"
-       "github.com/apache/answer/internal/service/event_queue"
+       "github.com/apache/answer/internal/service/eventqueue"
        "github.com/apache/answer/internal/service/export"
        "github.com/apache/answer/internal/service/file_record"
        "github.com/apache/answer/internal/service/follow"
        "github.com/apache/answer/internal/service/importer"
        "github.com/apache/answer/internal/service/meta"
        metacommon "github.com/apache/answer/internal/service/meta_common"
-       "github.com/apache/answer/internal/service/notice_queue"
+       "github.com/apache/answer/internal/service/noticequeue"
        "github.com/apache/answer/internal/service/notification"
        notficationcommon 
"github.com/apache/answer/internal/service/notification_common"
        "github.com/apache/answer/internal/service/object_info"
@@ -114,14 +114,14 @@ var ProviderSetService = wire.NewSet(
        user_external_login.NewUserCenterLoginService,
        plugin_common.NewPluginCommonService,
        config.NewConfigService,
-       notice_queue.NewNotificationQueueService,
-       activity_queue.NewActivityQueueService,
+       noticequeue.NewService,
+       activityqueue.NewService,
        user_notification_config.NewUserNotificationConfigService,
        notification.NewExternalNotificationService,
-       notice_queue.NewNewQuestionNotificationQueueService,
+       noticequeue.NewExternalService,
        review.NewReviewService,
        meta.NewMetaService,
-       event_queue.NewEventQueueService,
+       eventqueue.NewService,
        badge.NewBadgeService,
        badge.NewBadgeEventService,
        badge.NewBadgeAwardService,
diff --git a/internal/service/question_common/question.go 
b/internal/service/question_common/question.go
index 846dea89..557a5db1 100644
--- a/internal/service/question_common/question.go
+++ b/internal/service/question_common/question.go
@@ -34,7 +34,7 @@ import (
        "github.com/apache/answer/internal/base/handler"
        "github.com/apache/answer/internal/base/reason"
        "github.com/apache/answer/internal/service/activity_common"
-       "github.com/apache/answer/internal/service/activity_queue"
+       "github.com/apache/answer/internal/service/activityqueue"
        "github.com/apache/answer/internal/service/config"
        metacommon "github.com/apache/answer/internal/service/meta_common"
        "github.com/apache/answer/internal/service/revision"
@@ -103,7 +103,7 @@ type QuestionCommon struct {
        AnswerCommon         *answercommon.AnswerCommon
        metaCommonService    *metacommon.MetaCommonService
        configService        *config.ConfigService
-       activityQueueService activity_queue.ActivityQueueService
+       activityQueueService activityqueue.Service
        revisionRepo         revision.RevisionRepo
        siteInfoService      siteinfo_common.SiteInfoCommonService
        data                 *data.Data
@@ -119,7 +119,7 @@ func NewQuestionCommon(questionRepo QuestionRepo,
        answerCommon *answercommon.AnswerCommon,
        metaCommonService *metacommon.MetaCommonService,
        configService *config.ConfigService,
-       activityQueueService activity_queue.ActivityQueueService,
+       activityQueueService activityqueue.Service,
        revisionRepo revision.RevisionRepo,
        siteInfoService siteinfo_common.SiteInfoCommonService,
        data *data.Data,
diff --git a/internal/service/report/report_service.go 
b/internal/service/report/report_service.go
index d32ccdab..84c15d59 100644
--- a/internal/service/report/report_service.go
+++ b/internal/service/report/report_service.go
@@ -22,7 +22,7 @@ package report
 import (
        "encoding/json"
 
-       "github.com/apache/answer/internal/service/event_queue"
+       "github.com/apache/answer/internal/service/eventqueue"
 
        "github.com/apache/answer/internal/base/constant"
        "github.com/apache/answer/internal/base/handler"
@@ -57,7 +57,7 @@ type ReportService struct {
        commentCommonRepo comment_common.CommentCommonRepo
        reportHandle      *report_handle.ReportHandle
        configService     *config.ConfigService
-       eventQueueService event_queue.EventQueueService
+       eventQueueService eventqueue.Service
 }
 
 // NewReportService new report service
@@ -70,7 +70,7 @@ func NewReportService(
        commentCommonRepo comment_common.CommentCommonRepo,
        reportHandle *report_handle.ReportHandle,
        configService *config.ConfigService,
-       eventQueueService event_queue.EventQueueService,
+       eventQueueService eventqueue.Service,
 ) *ReportService {
        return &ReportService{
                reportRepo:        reportRepo,
diff --git a/internal/service/review/review_service.go 
b/internal/service/review/review_service.go
index a23b9ee4..bbb14289 100644
--- a/internal/service/review/review_service.go
+++ b/internal/service/review/review_service.go
@@ -29,7 +29,7 @@ import (
        "github.com/apache/answer/internal/schema"
        answercommon "github.com/apache/answer/internal/service/answer_common"
        commentcommon "github.com/apache/answer/internal/service/comment_common"
-       "github.com/apache/answer/internal/service/notice_queue"
+       "github.com/apache/answer/internal/service/noticequeue"
        "github.com/apache/answer/internal/service/object_info"
        questioncommon 
"github.com/apache/answer/internal/service/question_common"
        "github.com/apache/answer/internal/service/role"
@@ -66,8 +66,8 @@ type ReviewService struct {
        userRoleService                  *role.UserRoleRelService
        tagCommon                        *tagcommon.TagCommonService
        questionCommon                   *questioncommon.QuestionCommon
-       externalNotificationQueueService 
notice_queue.ExternalNotificationQueueService
-       notificationQueueService         notice_queue.NotificationQueueService
+       externalNotificationQueueService noticequeue.ExternalService
+       notificationQueueService         noticequeue.Service
        siteInfoService                  siteinfo_common.SiteInfoCommonService
        commentCommonRepo                commentcommon.CommentCommonRepo
 }
@@ -81,10 +81,10 @@ func NewReviewService(
        questionRepo questioncommon.QuestionRepo,
        answerRepo answercommon.AnswerRepo,
        userRoleService *role.UserRoleRelService,
-       externalNotificationQueueService 
notice_queue.ExternalNotificationQueueService,
+       externalNotificationQueueService noticequeue.ExternalService,
        tagCommon *tagcommon.TagCommonService,
        questionCommon *questioncommon.QuestionCommon,
-       notificationQueueService notice_queue.NotificationQueueService,
+       notificationQueueService noticequeue.Service,
        siteInfoService siteinfo_common.SiteInfoCommonService,
        commentCommonRepo commentcommon.CommentCommonRepo,
 ) *ReviewService {
diff --git a/internal/service/tag/tag_service.go 
b/internal/service/tag/tag_service.go
index e61bfa06..640f06b6 100644
--- a/internal/service/tag/tag_service.go
+++ b/internal/service/tag/tag_service.go
@@ -25,7 +25,7 @@ import (
        "strings"
 
        "github.com/apache/answer/internal/base/constant"
-       "github.com/apache/answer/internal/service/activity_queue"
+       "github.com/apache/answer/internal/service/activityqueue"
        "github.com/apache/answer/internal/service/revision_common"
        "github.com/apache/answer/internal/service/siteinfo_common"
        tagcommonser "github.com/apache/answer/internal/service/tag_common"
@@ -50,7 +50,7 @@ type TagService struct {
        revisionService      *revision_common.RevisionService
        followCommon         activity_common.FollowRepo
        siteInfoService      siteinfo_common.SiteInfoCommonService
-       activityQueueService activity_queue.ActivityQueueService
+       activityQueueService activityqueue.Service
 }
 
 // NewTagService new tag service
@@ -60,7 +60,7 @@ func NewTagService(
        revisionService *revision_common.RevisionService,
        followCommon activity_common.FollowRepo,
        siteInfoService siteinfo_common.SiteInfoCommonService,
-       activityQueueService activity_queue.ActivityQueueService,
+       activityQueueService activityqueue.Service,
 ) *TagService {
        return &TagService{
                tagRepo:              tagRepo,
diff --git a/internal/service/tag_common/tag_common.go 
b/internal/service/tag_common/tag_common.go
index 87c10bcc..9ca8e100 100644
--- a/internal/service/tag_common/tag_common.go
+++ b/internal/service/tag_common/tag_common.go
@@ -33,7 +33,7 @@ import (
        "github.com/apache/answer/internal/base/validator"
        "github.com/apache/answer/internal/entity"
        "github.com/apache/answer/internal/schema"
-       "github.com/apache/answer/internal/service/activity_queue"
+       "github.com/apache/answer/internal/service/activityqueue"
        "github.com/apache/answer/internal/service/revision_common"
        "github.com/apache/answer/internal/service/siteinfo_common"
        "github.com/apache/answer/pkg/converter"
@@ -89,7 +89,7 @@ type TagCommonService struct {
        tagRelRepo           TagRelRepo
        tagRepo              TagRepo
        siteInfoService      siteinfo_common.SiteInfoCommonService
-       activityQueueService activity_queue.ActivityQueueService
+       activityQueueService activityqueue.Service
 }
 
 // NewTagCommonService new tag service
@@ -99,7 +99,7 @@ func NewTagCommonService(
        tagRepo TagRepo,
        revisionService *revision_common.RevisionService,
        siteInfoService siteinfo_common.SiteInfoCommonService,
-       activityQueueService activity_queue.ActivityQueueService,
+       activityQueueService activityqueue.Service,
 ) *TagCommonService {
        return &TagCommonService{
                tagCommonRepo:        tagCommonRepo,

Reply via email to