This is an automated email from the ASF dual-hosted git repository. LinkinStars pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/answer.git
commit 5d923f8ca32edaade5377bf0e9200288e0823b9b Author: hgaol <[email protected]> AuthorDate: Tue Apr 21 23:14:43 2026 +0800 feat: add vector sync service and integrate with answer and question services --- Makefile | 15 ++- VECTOR_SEARCH_DESIGN.md | 142 +++++++++++++++++++++++++++ cmd/wire_gen.go | 10 +- docs/docs.go | 19 ---- docs/swagger.yaml | 17 ---- internal/repo/answer/answer_repo.go | 18 ++-- internal/repo/vector_search_sync/syncer.go | 44 +++++++++ internal/service/answer_common/answer.go | 1 + internal/service/comment/comment_service.go | 32 ++++++ internal/service/content/answer_service.go | 28 ++++++ internal/service/content/question_service.go | 15 +++ internal/service/mock/siteinfo_repo_mock.go | 19 ---- internal/service/provider.go | 2 + internal/service/review/review_service.go | 17 ++++ internal/service/vector_sync/vector_sync.go | 115 ++++++++++++++++++++++ 15 files changed, 425 insertions(+), 69 deletions(-) diff --git a/Makefile b/Makefile index 9bba4ee5..56af4b20 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: build clean ui +.PHONY: build clean ui build-local-vector-plugins VERSION=2.0.0 BIN=answer @@ -68,3 +68,16 @@ lint-fix: generate $(GOLANGCI) $(GOLANGCI) run --fix all: clean build + +build-local-vector-plugins: + @cd new_answer/answer_with_plugin && \ + CGO_ENABLED=0 go build -o answer ./... && \ + ANSWER_MODULE=$$(pwd)/../../ ./answer build \ + --with "github.com/apache/answer-plugins/vector-search-pgvector=$$(pwd)/../../../answer-plugins/vector-search-pgvector" \ + --with "github.com/apache/answer-plugins/vector-search-elasticsearch=$$(pwd)/../../../answer-plugins/vector-search-elasticsearch" \ + --with "github.com/apache/answer-plugins/vector-search-weaviate=$$(pwd)/../../../answer-plugins/vector-search-weaviate" \ + --with "github.com/apache/answer-plugins/vector-search-milvus=$$(pwd)/../../../answer-plugins/vector-search-milvus" \ + --with "github.com/apache/answer-plugins/vector-search-qdrant=$$(pwd)/../../../answer-plugins/vector-search-qdrant" \ + --with "github.com/apache/answer-plugins/vector-search-chromadb=$$(pwd)/../../../answer-plugins/vector-search-chromadb" \ + --with "github.com/apache/answer-plugins/vector-search-memory=$$(pwd)/../../../answer-plugins/vector-search-memory" \ + --output ./new_answer_with_plugins diff --git a/VECTOR_SEARCH_DESIGN.md b/VECTOR_SEARCH_DESIGN.md new file mode 100644 index 00000000..af83e875 --- /dev/null +++ b/VECTOR_SEARCH_DESIGN.md @@ -0,0 +1,142 @@ +# Vector Search & Semantic Search Design + +## Architecture Layers + +``` +┌──────────────────────────────────────────────┐ +│ AI Chat / MCP Tool ("semantic_search") │ ← Controller layer +├──────────────────────────────────────────────┤ +│ EmbeddingService │ ← Service layer (thin facade) +├──────────────────────────────────────────────┤ +│ plugin.VectorSearch interface │ ← Plugin abstraction +├──────────────────────────────────────────────┤ +│ pgvector / elasticsearch / weaviate / ... │ ← Plugin implementations +└──────────────────────────────────────────────┘ +``` + +## Plugin Interface (`plugin/vector_search.go`) + +- `RegisterSyncer(ctx, syncer)` — core provides a syncer for bulk data pull +- `SearchSimilar(ctx, query, topK)` — returns `[]VectorSearchResult{ObjectID, ObjectType, Metadata, Score}` +- `UpdateContent(ctx, content)` — upserts a document with embedding +- `DeleteContent(ctx, objectID)` — removes a document +- `ConfigReceiver(config)` / `ConfigFields()` — plugin config lifecycle + +`GenerateEmbedding()` is the shared embedding utility used by plugins. + +## Content Syncing (`vector_search_sync/syncer.go`) + +Core implements `VectorSearchSyncer` with: + +- `GetQuestionsPage(page, pageSize)` +- `GetAnswersPage(page, pageSize)` + +Each indexed document aggregates question/answer/comment text. Metadata stores deshortened IDs for reconstruction at query time. + +Sync is triggered from `RegisterSyncer()` (startup + config update flow). + +## Startup & Activation Flow + +``` +initPluginData(): + 1. Load plugin status from DB + 2. Call ConfigReceiver for configured plugins + -> parse config always + -> if active: run heavy init (probe embedding + connect/schema checks) + -> if inactive: skip heavy init (IsEnabled guard) + 3. Call RegisterSyncer for vector search plugins + -> if active/initialized: trigger full sync + -> if inactive/uninitialized: skip sync +``` + +On admin config save: +1. `ConfigReceiver` +2. `UpdatePluginConfig` -> `RegisterSyncer` -> full sync + +### Current Behavior Summary + +- **Active plugin on startup**: does one probe embedding call, then full sync to vector storage. +- **Inactive plugin on startup**: parses config only; no probe embedding and no sync. +- **Config save for active plugin**: re-runs init path and full sync. + +## Semantic Search Query Flow + +``` +User query -> MCP tool "semantic_search" + -> EmbeddingService.SearchSimilar(query, topK) + -> plugin.SearchSimilar() returns scored IDs + metadata + -> handler fetches full DB content (question/answers/comments) + -> returns structured semantic search response +``` + +## Follow-up: Real-Time Sync Gap + +### Comparison with Search Plugin + +| Aspect | Search Plugin | VectorSearch Plugin | +|--------|---------------|---------------------| +| Bulk sync | Yes | Yes | +| Real-time sync | Yes (create/update/delete hooks) | No | +| Trigger | Event-driven + startup/config | Startup/config only | +| Consistency | Near real-time | Eventually consistent | + +### Current Gap + +`UpdateContent()` / `DeleteContent()` exist in `plugin.VectorSearch`, but are not called from question/answer service events. So after initial sync, content changes are not reflected until next full re-sync. + +### Options + +1. **Manual (current)** + - Re-sync only on plugin config save/update + - Simple, but stale results between syncs + +2. **Real-time** + - Add event hooks to call vector search update/delete + - Can be async (goroutine / queue) to avoid write-path latency + - Higher embedding API call volume + +3. **Scheduled (cron)** + - Periodic bulk sync via cron expression + - Good for off-peak syncing + - Delayed freshness until next run + +### Proposed Common Setting (All Vector Search Plugins) + +Add a **single common sync policy setting** at the vector-search framework level (not per plugin implementation), so all vector search plugins behave consistently. + +#### New common config fields + +- `sync_mode`: enum + - `manual` + - `realtime` + - `scheduled` +- `sync_crontab`: string (required only when `sync_mode=scheduled`) + +#### Behavior by mode + +- `manual` + - No runtime incremental sync. + - Full sync only when plugin config is saved/updated (current behavior). + +- `realtime` + - Hook question/answer create/update/delete events in core service layer. + - Trigger `VectorSearch.UpdateContent` / `VectorSearch.DeleteContent` for the active plugin. + - Recommend async dispatch (queue/worker) to avoid request-path latency. + +- `scheduled` + - Core scheduler triggers periodic full sync (`RegisterSyncer`/sync routine) by `sync_crontab`. + - Intended for off-peak indexing windows. + +#### Design constraints + +- This setting should be **core-owned and plugin-agnostic**. +- Individual vector plugins should not implement their own sync-mode semantics. +- Core decides when to call plugin sync/update APIs; plugins only execute storage/index operations. +- Switching modes should apply immediately after config save. +- Inactive plugins remain parse-only (no heavy init/sync). + +#### Why common instead of per-plugin + +- Consistent admin UX and operational behavior. +- Prevents drift where plugins implement different sync semantics. +- Easier future enhancements (retry, dead-letter queue, backfill controls, observability) in one place. diff --git a/cmd/wire_gen.go b/cmd/wire_gen.go index 3ead33fb..7a8176ea 100644 --- a/cmd/wire_gen.go +++ b/cmd/wire_gen.go @@ -120,6 +120,7 @@ import ( "github.com/apache/answer/internal/service/user_common" user_external_login2 "github.com/apache/answer/internal/service/user_external_login" user_notification_config2 "github.com/apache/answer/internal/service/user_notification_config" + "github.com/apache/answer/internal/service/vector_sync" "github.com/segmentfault/pacman" "github.com/segmentfault/pacman/log" ) @@ -199,9 +200,10 @@ func initApplication(debug bool, serverConf *conf.Server, dbConf *data.Database, objService := object_info.NewObjService(answerRepo, questionRepo, commentCommonRepo, tagCommonRepo, tagCommonService) noticequeueService := noticequeue.NewService() externalService := noticequeue.NewExternalService() + vectorSyncService := vector_sync.NewService(dataData) reviewRepo := review.NewReviewRepo(dataData) - reviewService := review2.NewReviewService(reviewRepo, objService, userCommon, userRepo, questionRepo, answerRepo, userRoleRelService, externalService, tagCommonService, questionCommon, noticequeueService, siteInfoCommonService, commentCommonRepo) - commentService := comment2.NewCommentService(commentRepo, commentCommonRepo, userCommon, objService, voteRepo, emailService, userRepo, noticequeueService, externalService, service, eventqueueService, reviewService) + reviewService := review2.NewReviewService(reviewRepo, objService, userCommon, userRepo, questionRepo, answerRepo, userRoleRelService, externalService, tagCommonService, questionCommon, noticequeueService, siteInfoCommonService, commentCommonRepo, vectorSyncService) + commentService := comment2.NewCommentService(commentRepo, commentCommonRepo, userCommon, objService, voteRepo, emailService, userRepo, noticequeueService, externalService, service, eventqueueService, reviewService, vectorSyncService) rolePowerRelRepo := role.NewRolePowerRelRepo(dataData) rolePowerRelService := role2.NewRolePowerRelService(rolePowerRelRepo, userRoleRelService) rankService := rank2.NewRankService(userCommon, userRankRepo, objService, userRoleRelService, rolePowerRelService, configService) @@ -213,8 +215,8 @@ func initApplication(debug bool, serverConf *conf.Server, dbConf *data.Database, answerActivityRepo := activity.NewAnswerActivityRepo(dataData, activityRepo, userRankRepo, noticequeueService) answerActivityService := activity2.NewAnswerActivityService(answerActivityRepo, configService) externalNotificationService := notification.NewExternalNotificationService(dataData, userNotificationConfigRepo, followRepo, emailService, userRepo, externalService, userExternalLoginRepo, siteInfoCommonService) - questionService := content.NewQuestionService(activityRepo, questionRepo, answerRepo, tagCommonService, tagService, questionCommon, userCommon, userRepo, userRoleRelService, revisionService, metaCommonService, collectionCommon, answerActivityService, emailService, noticequeueService, externalService, service, siteInfoCommonService, externalNotificationService, reviewService, configService, eventqueueService, reviewRepo) - answerService := content.NewAnswerService(answerRepo, questionRepo, questionCommon, userCommon, collectionCommon, userRepo, revisionService, answerActivityService, answerCommon, voteRepo, emailService, userRoleRelService, noticequeueService, externalService, service, reviewService, eventqueueService) + questionService := content.NewQuestionService(activityRepo, questionRepo, answerRepo, tagCommonService, tagService, questionCommon, userCommon, userRepo, userRoleRelService, revisionService, metaCommonService, collectionCommon, answerActivityService, emailService, noticequeueService, externalService, service, siteInfoCommonService, externalNotificationService, reviewService, configService, eventqueueService, reviewRepo, vectorSyncService) + answerService := content.NewAnswerService(answerRepo, questionRepo, questionCommon, userCommon, collectionCommon, userRepo, revisionService, answerActivityService, answerCommon, voteRepo, emailService, userRoleRelService, noticequeueService, externalService, service, reviewService, eventqueueService, vectorSyncService) reportHandle := report_handle.NewReportHandle(questionService, answerService, commentService) reportService := report2.NewReportService(reportRepo, objService, userCommon, answerRepo, questionRepo, commentCommonRepo, reportHandle, configService, eventqueueService) reportController := controller.NewReportController(reportService, rankService, captchaService) diff --git a/docs/docs.go b/docs/docs.go index 57a23d43..7fe1643e 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -1,22 +1,3 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - // Package docs Code generated by swaggo/swag. DO NOT EDIT package docs diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 7a7adb68..cc5711b4 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -1,20 +1,3 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - basePath: / definitions: constant.NotificationChannelKey: diff --git a/internal/repo/answer/answer_repo.go b/internal/repo/answer/answer_repo.go index 52963c44..de91ed58 100644 --- a/internal/repo/answer/answer_repo.go +++ b/internal/repo/answer/answer_repo.go @@ -80,7 +80,7 @@ func (ar *answerRepo) AddAnswer(ctx context.Context, answer *entity.Answer) (err answer.ID = uid.EnShortID(answer.ID) answer.QuestionID = uid.EnShortID(answer.QuestionID) } - _ = ar.updateSearch(ctx, answer.ID) + _ = ar.UpdateSearch(ctx, answer.ID) return nil } @@ -93,7 +93,7 @@ func (ar *answerRepo) RemoveAnswer(ctx context.Context, answerID string) (err er if err != nil { return errors.InternalServer(reason.DatabaseError).WithError(err).WithStack() } - _ = ar.updateSearch(ctx, answerID) + _ = ar.UpdateSearch(ctx, answerID) return nil } @@ -106,7 +106,7 @@ func (ar *answerRepo) RecoverAnswer(ctx context.Context, answerID string) (err e if err != nil { return errors.InternalServer(reason.DatabaseError).WithError(err).WithStack() } - _ = ar.updateSearch(ctx, answerID) + _ = ar.UpdateSearch(ctx, answerID) return nil } @@ -139,7 +139,7 @@ func (ar *answerRepo) RemoveAllUserAnswer(ctx context.Context, userID string) (e // update search content for _, id := range answerIDs { - _ = ar.updateSearch(ctx, id) + _ = ar.UpdateSearch(ctx, id) } return nil } @@ -152,7 +152,7 @@ func (ar *answerRepo) UpdateAnswer(ctx context.Context, answer *entity.Answer, c if err != nil { err = errors.InternalServer(reason.DatabaseError).WithError(err).WithStack() } - _ = ar.updateSearch(ctx, answer.ID) + _ = ar.UpdateSearch(ctx, answer.ID) return err } @@ -162,7 +162,7 @@ func (ar *answerRepo) UpdateAnswerStatus(ctx context.Context, answerID string, s if err != nil { return errors.InternalServer(reason.DatabaseError).WithError(err).WithStack() } - _ = ar.updateSearch(ctx, answerID) + _ = ar.UpdateSearch(ctx, answerID) return } @@ -251,7 +251,7 @@ func (ar *answerRepo) UpdateAcceptedStatus(ctx context.Context, acceptedAnswerID return errors.InternalServer(reason.DatabaseError).WithError(err).WithStack() } } - _ = ar.updateSearch(ctx, acceptedAnswerID) + _ = ar.UpdateSearch(ctx, acceptedAnswerID) return nil } @@ -459,8 +459,8 @@ func (ar *answerRepo) SumVotesByQuestionID(ctx context.Context, questionID strin return count, nil } -// updateSearch update search, if search plugin not enable, do nothing -func (ar *answerRepo) updateSearch(ctx context.Context, answerID string) (err error) { +// UpdateSearch update search, if search plugin not enable, do nothing +func (ar *answerRepo) UpdateSearch(ctx context.Context, answerID string) (err error) { answerID = uid.DeShortID(answerID) // check search plugin var ( diff --git a/internal/repo/vector_search_sync/syncer.go b/internal/repo/vector_search_sync/syncer.go index f27b1fd2..be829e8f 100644 --- a/internal/repo/vector_search_sync/syncer.go +++ b/internal/repo/vector_search_sync/syncer.go @@ -194,3 +194,47 @@ func (p *PluginSyncer) buildAnswerContents(ctx context.Context, answers []*entit } return result, nil } + +// BuildQuestionContentByID builds vector content for one question using the same +// aggregation semantics as bulk vector sync. +func BuildQuestionContentByID(ctx context.Context, data *data.Data, questionID string) (*plugin.VectorSearchContent, error) { + question := &entity.Question{} + exist, err := data.DB.Context(ctx).Where("id = ?", uid.DeShortID(questionID)).Get(question) + if err != nil { + return nil, err + } + if !exist { + return nil, nil + } + syncer := &PluginSyncer{data: data} + contents, err := syncer.buildQuestionContents(ctx, []*entity.Question{question}) + if err != nil { + return nil, err + } + if len(contents) == 0 { + return nil, nil + } + return contents[0], nil +} + +// BuildAnswerContentByID builds vector content for one answer using the same +// aggregation semantics as bulk vector sync. +func BuildAnswerContentByID(ctx context.Context, data *data.Data, answerID string) (*plugin.VectorSearchContent, error) { + answer := &entity.Answer{} + exist, err := data.DB.Context(ctx).Where("id = ?", uid.DeShortID(answerID)).Get(answer) + if err != nil { + return nil, err + } + if !exist { + return nil, nil + } + syncer := &PluginSyncer{data: data} + contents, err := syncer.buildAnswerContents(ctx, []*entity.Answer{answer}) + if err != nil { + return nil, err + } + if len(contents) == 0 { + return nil, nil + } + return contents[0], nil +} diff --git a/internal/service/answer_common/answer.go b/internal/service/answer_common/answer.go index 989eb143..1ab66458 100644 --- a/internal/service/answer_common/answer.go +++ b/internal/service/answer_common/answer.go @@ -48,6 +48,7 @@ type AnswerRepo interface { resp []*entity.Answer, total int64, err error) AdminSearchList(ctx context.Context, search *schema.AdminAnswerPageReq) ([]*entity.Answer, int64, error) UpdateAnswerStatus(ctx context.Context, answerID string, status int) (err error) + UpdateSearch(ctx context.Context, answerID string) (err error) GetAnswerCount(ctx context.Context) (count int64, err error) RemoveAllUserAnswer(ctx context.Context, userID string) (err error) SumVotesByQuestionID(ctx context.Context, questionID string) (float64, error) diff --git a/internal/service/comment/comment_service.go b/internal/service/comment/comment_service.go index 30ff43c6..cf317fdd 100644 --- a/internal/service/comment/comment_service.go +++ b/internal/service/comment/comment_service.go @@ -40,6 +40,7 @@ import ( "github.com/apache/answer/internal/service/object_info" "github.com/apache/answer/internal/service/permission" usercommon "github.com/apache/answer/internal/service/user_common" + "github.com/apache/answer/internal/service/vector_sync" "github.com/apache/answer/pkg/htmltext" "github.com/apache/answer/pkg/token" "github.com/apache/answer/pkg/uid" @@ -93,6 +94,7 @@ type CommentService struct { activityQueueService activityqueue.Service eventQueueService eventqueue.Service reviewService *review.ReviewService + vectorSyncService vector_sync.Service } // NewCommentService new comment service @@ -109,6 +111,7 @@ func NewCommentService( activityQueueService activityqueue.Service, eventQueueService eventqueue.Service, reviewService *review.ReviewService, + vectorSyncService vector_sync.Service, ) *CommentService { return &CommentService{ commentRepo: commentRepo, @@ -123,6 +126,7 @@ func NewCommentService( activityQueueService: activityQueueService, eventQueueService: eventQueueService, reviewService: reviewService, + vectorSyncService: vectorSyncService, } } @@ -214,6 +218,15 @@ func (cs *CommentService) AddComment(ctx context.Context, req *schema.AddComment } cs.activityQueueService.Send(ctx, activityMsg) cs.eventQueueService.Send(ctx, event) + if comment.Status == entity.CommentStatusAvailable { + switch objInfo.ObjectType { + case constant.QuestionObjectType: + cs.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeQuestion, ObjectID: objInfo.QuestionID}) + case constant.AnswerObjectType: + cs.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeAnswer, ObjectID: objInfo.AnswerID}) + cs.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeQuestion, ObjectID: objInfo.QuestionID}) + } + } return resp, nil } @@ -264,12 +277,25 @@ func (cs *CommentService) addCommentNotification( // RemoveComment delete comment func (cs *CommentService) RemoveComment(ctx context.Context, req *schema.RemoveCommentReq) (err error) { + commentInfo, exist, err := cs.commentCommonRepo.GetComment(ctx, req.CommentID) + if err != nil { + return err + } + if !exist { + return nil + } err = cs.commentRepo.RemoveComment(ctx, req.CommentID) if err != nil { return err } cs.eventQueueService.Send(ctx, schema.NewEvent(constant.EventCommentDelete, req.UserID). TID(req.CommentID).CID(req.CommentID, req.UserID)) + if commentInfo.ObjectID == commentInfo.QuestionID { + cs.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeQuestion, ObjectID: commentInfo.QuestionID}) + } else { + cs.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeAnswer, ObjectID: commentInfo.ObjectID}) + cs.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeQuestion, ObjectID: commentInfo.QuestionID}) + } return nil } @@ -304,6 +330,12 @@ func (cs *CommentService) UpdateComment(ctx context.Context, req *schema.UpdateC } cs.eventQueueService.Send(ctx, schema.NewEvent(constant.EventCommentUpdate, req.UserID).TID(old.ID). CID(old.ID, old.UserID)) + if old.ObjectID == old.QuestionID { + cs.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeQuestion, ObjectID: old.QuestionID}) + } else { + cs.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeAnswer, ObjectID: old.ObjectID}) + cs.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeQuestion, ObjectID: old.QuestionID}) + } return resp, nil } diff --git a/internal/service/content/answer_service.go b/internal/service/content/answer_service.go index 2ad87517..2adddd6b 100644 --- a/internal/service/content/answer_service.go +++ b/internal/service/content/answer_service.go @@ -43,6 +43,7 @@ import ( "github.com/apache/answer/internal/service/revision_common" "github.com/apache/answer/internal/service/role" usercommon "github.com/apache/answer/internal/service/user_common" + "github.com/apache/answer/internal/service/vector_sync" "github.com/apache/answer/pkg/converter" "github.com/apache/answer/pkg/htmltext" "github.com/apache/answer/pkg/token" @@ -70,6 +71,7 @@ type AnswerService struct { activityQueueService activityqueue.Service reviewService *review.ReviewService eventQueueService eventqueue.Service + vectorSyncService vector_sync.Service } func NewAnswerService( @@ -90,6 +92,7 @@ func NewAnswerService( activityQueueService activityqueue.Service, reviewService *review.ReviewService, eventQueueService eventqueue.Service, + vectorSyncService vector_sync.Service, ) *AnswerService { return &AnswerService{ answerRepo: answerRepo, @@ -109,6 +112,7 @@ func NewAnswerService( activityQueueService: activityQueueService, reviewService: reviewService, eventQueueService: eventQueueService, + vectorSyncService: vectorSyncService, } } @@ -192,6 +196,8 @@ func (as *AnswerService) RemoveAnswer(ctx context.Context, req *schema.RemoveAns }) as.eventQueueService.Send(ctx, schema.NewEvent(constant.EventAnswerDelete, req.UserID).TID(answerInfo.ID). AID(answerInfo.ID, answerInfo.UserID)) + as.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionDelete, ObjectType: vector_sync.ObjectTypeAnswer, ObjectID: answerInfo.ID}) + as.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeQuestion, ObjectID: answerInfo.QuestionID}) return } @@ -239,6 +245,8 @@ func (as *AnswerService) RecoverAnswer(ctx context.Context, req *schema.RecoverA OriginalObjectID: answerInfo.ID, ActivityTypeKey: constant.ActAnswerUndeleted, }) + as.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeAnswer, ObjectID: answerInfo.ID}) + as.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeQuestion, ObjectID: answerInfo.QuestionID}) return nil } @@ -332,6 +340,10 @@ func (as *AnswerService) Insert(ctx context.Context, req *schema.AnswerAddReq) ( }) as.eventQueueService.Send(ctx, schema.NewEvent(constant.EventAnswerCreate, req.UserID).TID(insertData.ID). AID(insertData.ID, insertData.UserID)) + if insertData.Status == entity.AnswerStatusAvailable { + as.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeAnswer, ObjectID: insertData.ID}) + as.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeQuestion, ObjectID: insertData.QuestionID}) + } return insertData.ID, nil } @@ -425,6 +437,8 @@ func (as *AnswerService) Update(ctx context.Context, req *schema.AnswerUpdateReq }) as.eventQueueService.Send(ctx, schema.NewEvent(constant.EventAnswerUpdate, req.UserID).TID(insertData.ID). AID(insertData.ID, insertData.UserID)) + as.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeAnswer, ObjectID: insertData.ID}) + as.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeQuestion, ObjectID: insertData.QuestionID}) } return insertData.ID, nil @@ -489,6 +503,13 @@ func (as *AnswerService) AcceptAnswer(ctx context.Context, req *schema.AcceptAns } as.updateAnswerRank(ctx, req.UserID, questionInfo, acceptedAnswerInfo, oldAnswerInfo) + if acceptedAnswerInfo != nil { + as.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeAnswer, ObjectID: acceptedAnswerInfo.ID}) + } + if oldAnswerInfo != nil { + as.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeAnswer, ObjectID: oldAnswerInfo.ID}) + } + as.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeQuestion, ObjectID: questionInfo.ID}) return nil } @@ -616,6 +637,13 @@ func (as *AnswerService) AdminSetAnswerStatus(ctx context.Context, req *schema.A return err } } + if setStatus == entity.AnswerStatusDeleted { + as.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionDelete, ObjectType: vector_sync.ObjectTypeAnswer, ObjectID: answerInfo.ID}) + as.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeQuestion, ObjectID: answerInfo.QuestionID}) + } else if setStatus == entity.AnswerStatusAvailable { + as.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeAnswer, ObjectID: answerInfo.ID}) + as.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeQuestion, ObjectID: answerInfo.QuestionID}) + } return nil } diff --git a/internal/service/content/question_service.go b/internal/service/content/question_service.go index bc3ac0bb..8b34fe48 100644 --- a/internal/service/content/question_service.go +++ b/internal/service/content/question_service.go @@ -55,6 +55,7 @@ import ( "github.com/apache/answer/internal/service/tag" tagcommon "github.com/apache/answer/internal/service/tag_common" usercommon "github.com/apache/answer/internal/service/user_common" + "github.com/apache/answer/internal/service/vector_sync" "github.com/apache/answer/pkg/checker" "github.com/apache/answer/pkg/converter" "github.com/apache/answer/pkg/htmltext" @@ -93,6 +94,7 @@ type QuestionService struct { configService *config.ConfigService eventQueueService eventqueue.Service reviewRepo review.ReviewRepo + vectorSyncService vector_sync.Service } func NewQuestionService( @@ -119,6 +121,7 @@ func NewQuestionService( configService *config.ConfigService, eventQueueService eventqueue.Service, reviewRepo review.ReviewRepo, + vectorSyncService vector_sync.Service, ) *QuestionService { return &QuestionService{ activityRepo: activityRepo, @@ -144,6 +147,7 @@ func NewQuestionService( configService: configService, eventQueueService: eventQueueService, reviewRepo: reviewRepo, + vectorSyncService: vectorSyncService, } } @@ -456,6 +460,9 @@ func (qs *QuestionService) AddQuestion(ctx context.Context, req *schema.Question } qs.eventQueueService.Send(ctx, schema.NewEvent(constant.EventQuestionCreate, req.UserID).TID(question.ID). QID(question.ID, question.UserID)) + if question.Status == entity.QuestionStatusAvailable { + qs.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeQuestion, ObjectID: question.ID}) + } questionInfo, err = qs.GetQuestion(ctx, question.ID, question.UserID, req.QuestionPermission) return @@ -652,6 +659,7 @@ func (qs *QuestionService) RemoveQuestion(ctx context.Context, req *schema.Remov }) qs.eventQueueService.Send(ctx, schema.NewEvent(constant.EventQuestionDelete, req.UserID).TID(questionInfo.ID). QID(questionInfo.ID, questionInfo.UserID)) + qs.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionDelete, ObjectType: vector_sync.ObjectTypeQuestion, ObjectID: questionInfo.ID}) return nil } @@ -783,6 +791,7 @@ func (qs *QuestionService) RecoverQuestion(ctx context.Context, req *schema.Ques OriginalObjectID: questionInfo.ID, ActivityTypeKey: constant.ActQuestionUndeleted, }) + qs.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeQuestion, ObjectID: questionInfo.ID}) return nil } @@ -1068,6 +1077,7 @@ func (qs *QuestionService) UpdateQuestion(ctx context.Context, req *schema.Quest }) qs.eventQueueService.Send(ctx, schema.NewEvent(constant.EventQuestionUpdate, req.UserID).TID(question.ID). QID(question.ID, question.UserID)) + qs.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeQuestion, ObjectID: question.ID}) } questionInfo, err = qs.GetQuestion(ctx, question.ID, question.UserID, req.QuestionPermission) @@ -1629,6 +1639,11 @@ func (qs *QuestionService) AdminSetQuestionStatus(ctx context.Context, req *sche msg.ObjectType = constant.QuestionObjectType qs.notificationQueueService.Send(ctx, msg) } + if setStatus == entity.QuestionStatusDeleted { + qs.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionDelete, ObjectType: vector_sync.ObjectTypeQuestion, ObjectID: questionInfo.ID}) + } else if setStatus == entity.QuestionStatusAvailable { + qs.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeQuestion, ObjectID: questionInfo.ID}) + } return nil } diff --git a/internal/service/mock/siteinfo_repo_mock.go b/internal/service/mock/siteinfo_repo_mock.go index ad3a170a..9438be66 100644 --- a/internal/service/mock/siteinfo_repo_mock.go +++ b/internal/service/mock/siteinfo_repo_mock.go @@ -1,22 +1,3 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - // Code generated by MockGen. DO NOT EDIT. // Source: ./siteinfo_service.go // diff --git a/internal/service/provider.go b/internal/service/provider.go index 26f1c430..d848272f 100644 --- a/internal/service/provider.go +++ b/internal/service/provider.go @@ -68,6 +68,7 @@ import ( usercommon "github.com/apache/answer/internal/service/user_common" "github.com/apache/answer/internal/service/user_external_login" "github.com/apache/answer/internal/service/user_notification_config" + "github.com/apache/answer/internal/service/vector_sync" "github.com/google/wire" ) @@ -136,4 +137,5 @@ var ProviderSetService = wire.NewSet( ai_conversation.NewAIConversationService, feature_toggle.NewFeatureToggleService, embedding.NewEmbeddingService, + vector_sync.NewService, ) diff --git a/internal/service/review/review_service.go b/internal/service/review/review_service.go index bbb14289..054e2204 100644 --- a/internal/service/review/review_service.go +++ b/internal/service/review/review_service.go @@ -36,6 +36,7 @@ import ( "github.com/apache/answer/internal/service/siteinfo_common" tagcommon "github.com/apache/answer/internal/service/tag_common" usercommon "github.com/apache/answer/internal/service/user_common" + "github.com/apache/answer/internal/service/vector_sync" "github.com/apache/answer/pkg/htmltext" "github.com/apache/answer/pkg/token" "github.com/apache/answer/pkg/uid" @@ -70,6 +71,7 @@ type ReviewService struct { notificationQueueService noticequeue.Service siteInfoService siteinfo_common.SiteInfoCommonService commentCommonRepo commentcommon.CommentCommonRepo + vectorSyncService vector_sync.Service } // NewReviewService new review service @@ -87,6 +89,7 @@ func NewReviewService( notificationQueueService noticequeue.Service, siteInfoService siteinfo_common.SiteInfoCommonService, commentCommonRepo commentcommon.CommentCommonRepo, + vectorSyncService vector_sync.Service, ) *ReviewService { return &ReviewService{ reviewRepo: reviewRepo, @@ -102,6 +105,7 @@ func NewReviewService( notificationQueueService: notificationQueueService, siteInfoService: siteInfoService, commentCommonRepo: commentCommonRepo, + vectorSyncService: vectorSyncService, } } @@ -290,6 +294,9 @@ func (cs *ReviewService) updateObjectStatus(ctx context.Context, review *entity. } cs.externalNotificationQueueService.Send(ctx, schema.CreateNewQuestionNotificationMsg(questionInfo.ID, questionInfo.Title, questionInfo.UserID, tags)) + cs.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeQuestion, ObjectID: questionInfo.ID}) + } else { + cs.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionDelete, ObjectType: vector_sync.ObjectTypeQuestion, ObjectID: questionInfo.ID}) } userQuestionCount, err := cs.questionRepo.GetUserQuestionCount(ctx, questionInfo.UserID, 0) if err != nil { @@ -326,7 +333,11 @@ func (cs *ReviewService) updateObjectStatus(ctx context.Context, review *entity. if isApprove { cs.notificationAnswerTheQuestion(ctx, questionInfo.UserID, questionInfo.ID, answerInfo.ID, answerInfo.UserID, questionInfo.Title, answerInfo.OriginalText) + cs.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeAnswer, ObjectID: answerInfo.ID}) + } else { + cs.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionDelete, ObjectType: vector_sync.ObjectTypeAnswer, ObjectID: answerInfo.ID}) } + cs.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeQuestion, ObjectID: answerInfo.QuestionID}) if err := cs.questionCommon.UpdateAnswerCount(ctx, answerInfo.QuestionID); err != nil { log.Errorf("update question answer count failed, err: %v", err) } @@ -368,6 +379,12 @@ func (cs *ReviewService) updateObjectStatus(ctx context.Context, review *entity. if isApprove { cs.notificationCommentOnTheQuestion(ctx, commentInfo) } + if commentInfo.ObjectID == commentInfo.QuestionID { + cs.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeQuestion, ObjectID: commentInfo.QuestionID}) + } else { + cs.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeAnswer, ObjectID: commentInfo.ObjectID}) + cs.vectorSyncService.Send(ctx, &vector_sync.Task{Action: vector_sync.ActionUpsert, ObjectType: vector_sync.ObjectTypeQuestion, ObjectID: commentInfo.QuestionID}) + } } return } diff --git a/internal/service/vector_sync/vector_sync.go b/internal/service/vector_sync/vector_sync.go new file mode 100644 index 00000000..c7281ad4 --- /dev/null +++ b/internal/service/vector_sync/vector_sync.go @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package vector_sync + +import ( + "context" + + "github.com/apache/answer/internal/base/data" + "github.com/apache/answer/internal/base/queue" + "github.com/apache/answer/internal/repo/vector_search_sync" + "github.com/apache/answer/pkg/uid" + "github.com/apache/answer/plugin" + "github.com/segmentfault/pacman/log" +) + +const ( + ActionUpsert = "upsert" + ActionDelete = "delete" + + ObjectTypeQuestion = "question" + ObjectTypeAnswer = "answer" +) + +const maxRetry = 3 + +type Task struct { + Action string + ObjectType string + ObjectID string +} + +type Service queue.Service[*Task] + +func NewService(data *data.Data) Service { + q := queue.New[*Task]("vector_sync", 128) + q.RegisterHandler(func(ctx context.Context, msg *Task) error { + return handle(ctx, data, msg) + }) + return q +} + +func handle(ctx context.Context, data *data.Data, msg *Task) error { + if msg == nil || msg.ObjectID == "" { + return nil + } + + var vectorSearch plugin.VectorSearch + _ = plugin.CallVectorSearch(func(vs plugin.VectorSearch) error { + vectorSearch = vs + return nil + }) + if vectorSearch == nil { + return nil + } + + objectID := uid.DeShortID(msg.ObjectID) + var lastErr error + for attempt := 1; attempt <= maxRetry; attempt++ { + err := handleOnce(ctx, data, vectorSearch, msg.Action, msg.ObjectType, objectID) + if err == nil { + return nil + } + lastErr = err + log.Warnf("vector sync failed: action=%s object_type=%s object_id=%s attempt=%d err=%v", + msg.Action, msg.ObjectType, objectID, attempt, err) + } + return lastErr +} + +func handleOnce(ctx context.Context, data *data.Data, vectorSearch plugin.VectorSearch, + action, objectType, objectID string) error { + if action == ActionDelete { + return vectorSearch.DeleteContent(ctx, objectID) + } + if action != ActionUpsert { + return nil + } + + var ( + content *plugin.VectorSearchContent + err error + ) + switch objectType { + case ObjectTypeQuestion: + content, err = vector_search_sync.BuildQuestionContentByID(ctx, data, objectID) + case ObjectTypeAnswer: + content, err = vector_search_sync.BuildAnswerContentByID(ctx, data, objectID) + default: + return nil + } + if err != nil { + return err + } + if content == nil { + return vectorSearch.DeleteContent(ctx, objectID) + } + return vectorSearch.UpdateContent(ctx, content) +}
