GitHub user xintongsong closed a discussion: Short-Term Memory Design
# Clarification of Concepts - **Run:** An agent run refers to the processing of a single input event. - **Key:** In Flink Agents, inputs of the agent are partitioned by their keys. This corresponds to how data are partitioned by keys in Flink's Keyed DataStream. - **Memory:** Data stored during execution of agents, for remembering things from previous agent runs and recall them in subsequent agent runs. - **Short-Term Memory:** Memory shared across all actions within an agent run, and multiple agent runs with the same input key. This corresponds to Flink's Keyed State, which is visible to processing of multiple records with in the same keyed partition, and is transparent to processing of data in other keyed partitions. - **Long-Term Memory:** Memory shared across all keyed partitions. (Design of long-term memory is not in the scope of this documentation.) # Design Overview  The above figure shows the design of Short-Term Memory. - Logically, the short-term memory may have multiple fields, where each field is either a primary type or an objection. For an object, it may have more fields of primary types or objects. The latter results in a chain of nested objects. - Physically, we store the short-term memory in a Flink MapState. This requires flatten the nested objects into a single-layer map. We use the absolute path, the sequence of field names starting from the root (Short-Term Memory) that points to the current field, as the map key. For primary types, the map value is the field value. For objects, the map value is a special mark indicating the existence of the object. The mark also contains a list of direct field names of the object, so that when fetching the whole object, we don't need to iterate over the entire map state looking for keys with certain prefix. Please notice that the root node in the figure is also an object. Therefore, the key to the short-term memory design is around storing and accessing of objects. # APIs ## MemoryObject We introduced MemoryObject as the interface for accessing an object in the short-term memory. ``` class MemoryObject(ABC): """ Representation of an object in the short-term memory. """ @abstractmethod def get(self, path: str) -> Any: """ Get the value of a (direct or indirect) field in the object. Parameters ---------- path: str Relative path from the current object to the target field. Returns: ------- Any The value of the field. If the field is an object, another MemoryObject will be returned. If the field doesn't exist, returns None. """ @abstractmethod def set(self, path: str, value: Any): """ Set the value of a (direct or indirect) field in the object. This will also create the intermediate objects if not exist. Parameters ---------- path: str Relative path from the current object to the target field. value: Any New value of the field. The type of the value must be either a primary type, or MemoryObject. """ @abstractmethod def new_object(self, path: str) -> MemoryObject: """ Create a new object as the value of a (direct or indirect) field in the object. Parameters ---------- path: str Relative path from the current object to the target field. Returns: ------- MemoryObject The created object. """ @abstractmethod def is_exist(self, path: str) -> bool: """ Check whether a (direct or indirect) field exist in the object. Parameters ---------- path: str Relative path from the current object to the target field. Returns: ------- bool Whether the field exists. """ @abstractmethod def get_field_names(self) -> List[str]: """ Get names of all the direct fields of the object. Returns: ------- List[str] Direct field names of the object in a list. """ @abstractmethod def get_fields(self) -> Dict[str, Any]: """ Get all the direct fields of the object. Returns: ------- Dict[str, Any] Direct fields in a dictionary. """ ``` ## RunnerContext ``` class RunnerContext(ABC): # ... @abstractmethod def get_short_term_memory(self) -> MemoryObject: """ Get the short-term memory. Returns: ------- MemoryObject The root object of the short-term memory. """ ``` ## Examples ``` @action(MyEvent) @staticmethod def my_action(event: Event, ctx: RunnerContext): # ... root = ctx.get_short_term_memory() root.set("x", 100) root.set("y", "abc") ojb_z = root.new_object("z") # { # "x" : 100, # "y" : "abc" # "z" : {} # } obj_z.set("m", 0.5) obj_z.set("n.j", True) # { # "x" : 100, # "y" : "abc" # "z" : { # "m" : 0.5, # "n" : { # "j" : True # } # } # } root.get("x") # 100 root.get("z.m") # 0.5 root.get("xx") # None root.get("z.mm") # None root.is_exist("x") # True root.is_exist("xx") # False root.is_exist("z.m") # True root.is_exist("z.mm") # False obj_n = root.get("z.n") obj_n.get("j") # True obj_z.get_field_names() # ["m", "n"] obj_z.get_fields() # {"m" : 0.5, "n" : MemoryObject} ``` GitHub link: https://github.com/apache/flink-agents/discussions/40 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
