This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-agents.git
commit c11f79a4267ee1229d9afaeb94ee061d0940b39e Author: WenjinXie <[email protected]> AuthorDate: Fri Dec 19 17:57:43 2025 +0800 [api][runtime] Add return value for add and improve summarization prompt. --- python/flink_agents/api/memory/long_term_memory.py | 37 +++++++++++++++++++--- .../memory/vector_store_long_term_memory.py | 17 ++++++++-- 2 files changed, 47 insertions(+), 7 deletions(-) diff --git a/python/flink_agents/api/memory/long_term_memory.py b/python/flink_agents/api/memory/long_term_memory.py index 6373499..7d61182 100644 --- a/python/flink_agents/api/memory/long_term_memory.py +++ b/python/flink_agents/api/memory/long_term_memory.py @@ -30,6 +30,7 @@ from pydantic import ( from typing_extensions import override from flink_agents.api.chat_message import ChatMessage +from flink_agents.api.configuration import ConfigOption from flink_agents.api.prompts.prompt import Prompt ItemType = str | ChatMessage @@ -76,6 +77,28 @@ class LongTermMemoryBackend(Enum): EXTERNAL_VECTOR_STORE = "external_vector_store" +class LongTermMemoryOptions: + """Config options for ReActAgent.""" + + BACKEND = ConfigOption( + key="long-term-memory.", + config_type=LongTermMemoryBackend, + default=None, + ) + + EXTERNAL_VECTOR_STORE_NAME = ConfigOption( + key="long-term-memory.external-vector-store-name", + config_type=str, + default=None, + ) + + ASYNC_COMPACTION = ConfigOption( + key="long-term-memory.async-compaction", + config_type=bool, + default=False, + ) + + class DatetimeRange(BaseModel): """Represents a datetime range.""" @@ -159,7 +182,7 @@ class MemorySet(BaseModel): def add( self, items: ItemType | List[ItemType], ids: str | List[str] | None = None - ) -> None: + ) -> List[str]: """Add a memory item to the set, currently only support item with type str or ChatMessage. @@ -169,8 +192,11 @@ class MemorySet(BaseModel): Args: items: The items to be inserted to this set. ids: The ids of the items to be inserted. Optional. + + Returns: + The IDs of the items added. """ - self.ltm.add(memory_set=self, memory_items=items, ids=ids) + return self.ltm.add(memory_set=self, memory_items=items, ids=ids) def get( self, ids: str | List[str] | None = None @@ -203,7 +229,7 @@ class BaseLongTermMemory(ABC, BaseModel): def get_or_create_memory_set( self, name: str, - item_type: str | Type[ChatMessage], + item_type: type[str] | Type[ChatMessage], capacity: int, compaction_strategy: CompactionStrategy, ) -> MemorySet: @@ -257,7 +283,7 @@ class BaseLongTermMemory(ABC, BaseModel): memory_items: ItemType | List[ItemType], ids: str | List[str] | None = None, metadatas: Dict[str, Any] | List[Dict[str, Any]] | None = None, - ) -> None: + ) -> List[str]: """Add items to the memory set, currently only support items with type str or ChatMessage. @@ -269,6 +295,9 @@ class BaseLongTermMemory(ABC, BaseModel): ids: The IDs of items. Will be automatically generated if not provided. Optional. metadatas: The metadata for items. Optional. + + Returns: + The IDs of added items. """ @abstractmethod diff --git a/python/flink_agents/runtime/memory/vector_store_long_term_memory.py b/python/flink_agents/runtime/memory/vector_store_long_term_memory.py index e4aabda..92ff6ac 100644 --- a/python/flink_agents/runtime/memory/vector_store_long_term_memory.py +++ b/python/flink_agents/runtime/memory/vector_store_long_term_memory.py @@ -29,6 +29,7 @@ from flink_agents.api.memory.long_term_memory import ( CompactionStrategyType, DatetimeRange, ItemType, + LongTermMemoryOptions, MemorySet, MemorySetItem, ) @@ -61,6 +62,10 @@ class VectorStoreLongTermMemory(BaseLongTermMemory): key: str = Field(description="Unique identifier for the keyed partition.") + async_compaction: bool = Field( + default=False, description="Whether to execute compact asynchronously." + ) + def __init__( self, *, @@ -76,6 +81,7 @@ class VectorStoreLongTermMemory(BaseLongTermMemory): vector_store=vector_store, job_id=job_id, key=key, + async_compaction=ctx.config.get(LongTermMemoryOptions.ASYNC_COMPACTION), **kwargs, ) @@ -137,7 +143,7 @@ class VectorStoreLongTermMemory(BaseLongTermMemory): memory_items: ItemType | List[ItemType], ids: str | List[str] | None = None, metadatas: Dict[str, Any] | List[Dict[str, Any]] | None = None, - ) -> None: + ) -> List[str]: memory_items = _maybe_cast_to_list(memory_items) ids = _maybe_cast_to_list(ids) metadatas = _maybe_cast_to_list(metadatas) @@ -171,13 +177,18 @@ class VectorStoreLongTermMemory(BaseLongTermMemory): ) ) - self.store.add( + ids = self.store.add( documents=documents, collection_name=self._name_mangling(memory_set.name) ) if memory_set.size >= memory_set.capacity: # trigger compaction - self._compact(memory_set) + if self.async_compaction: + self.ctx.executor.submit(self._compact, memory_set=memory_set) + else: + self._compact(memory_set=memory_set) + + return ids @override def get(
