GitHub user xintongsong closed a discussion: Short-Term Memory Design

# Clarification of Concepts
- **Run:** An agent run refers to the processing of a single input event.
- **Key:** In Flink Agents, inputs of the agent are partitioned by their keys. 
This corresponds to how data are partitioned by keys in Flink's Keyed 
DataStream.
- **Memory:** Data stored during execution of agents, for remembering things 
from previous agent runs and recall them in subsequent agent runs.
  - **Short-Term Memory:** Memory shared across all actions within an agent 
run, and multiple agent runs with the same input key. This corresponds to 
Flink's Keyed State, which is visible to processing of multiple records with in 
the same keyed partition, and is transparent to processing of data in other 
keyed partitions.
  - **Long-Term Memory:** Memory shared across all keyed partitions. (Design of 
long-term memory is not in the scope of this documentation.)

# Design Overview
![memory](https://github.com/user-attachments/assets/88bfda90-1066-4ad6-94bd-2cf446945319)

The above figure shows the design of Short-Term Memory.
- Logically, the short-term memory may have multiple fields, where each field 
is either a primary type or an objection. For an object, it may have more 
fields of primary types or objects. The latter results in a chain of nested 
objects.
- Physically, we store the short-term memory in a Flink MapState. This requires 
flatten the nested objects into a single-layer map. We use the absolute path, 
the sequence of field names starting from the root (Short-Term Memory) that 
points to the current field, as the map key. For primary types, the map value 
is the field value. For objects, the map value is a special mark indicating the 
existence of the object. The mark also contains a list of direct field names of 
the object, so that when fetching the whole object, we don't need to iterate 
over the entire map state looking for keys with certain prefix.

Please notice that the root node in the figure is also an object. Therefore, 
the key to the short-term memory design is around storing and accessing of 
objects.

# APIs

## MemoryObject

We introduced MemoryObject as the interface for accessing an object in the 
short-term memory.

```
class MemoryObject(ABC):
  """
  Representation of an object in the short-term memory.                
  """

  @abstractmethod
  def get(self, path: str) -> Any:
    """
    Get the value of a (direct or indirect) field in the object.

    Parameters
    ----------
    path: str
      Relative path from the current object to the target field. 

    Returns:
    -------
    Any
      The value of the field. If the field is an object, another MemoryObject 
will be returned. If the field doesn't exist, returns None.
    """

  @abstractmethod
  def set(self, path: str, value: Any):
    """
    Set the value of a (direct or indirect) field in the object.
    This will also create the intermediate objects if not exist.

    Parameters
    ----------
    path: str
      Relative path from the current object to the target field. 
    value: Any
      New value of the field. The type of the value must be either a primary 
type, or MemoryObject.
    """

  @abstractmethod
  def new_object(self, path: str) -> MemoryObject:
    """
    Create a new object as the value of a (direct or indirect) field in the 
object.

    Parameters
    ----------
    path: str
      Relative path from the current object to the target field. 

    Returns:
    -------
    MemoryObject
      The created object.
    """

  @abstractmethod
  def is_exist(self, path: str) -> bool:
    """
    Check whether a (direct or indirect) field exist in the object.

    Parameters
    ----------
    path: str
      Relative path from the current object to the target field. 

    Returns:
    -------
    bool
      Whether the field exists.
    """
  
  @abstractmethod
  def get_field_names(self) -> List[str]:
    """
    Get names of all the direct fields of the object.

    Returns:
    -------
    List[str]
      Direct field names of the object in a list.
    """

  @abstractmethod
  def get_fields(self) -> Dict[str, Any]:
    """
    Get all the direct fields of the object.

    Returns:
    -------
    Dict[str, Any]
      Direct fields in a dictionary.
    """
```

## RunnerContext

```
class RunnerContext(ABC):
  # ...

  @abstractmethod
  def get_short_term_memory(self) -> MemoryObject:
    """
    Get the short-term memory.

    Returns:
    -------
    MemoryObject
      The root object of the short-term memory.
    """
```

## Examples

```
@action(MyEvent)
@staticmethod
def my_action(event: Event, ctx: RunnerContext):
  # ...

  root = ctx.get_short_term_memory()

  root.set("x", 100)
  root.set("y", "abc")
  ojb_z = root.new_object("z")

  # {
  #   "x" : 100,
  #   "y" : "abc"
  #   "z" : {}
  # }

  obj_z.set("m", 0.5)
  obj_z.set("n.j", True)

  # {
  #   "x" : 100,
  #   "y" : "abc"
  #   "z" : {
  #     "m" : 0.5,
  #     "n" : {
  #       "j" : True
  #     }    
  #   }
  # }

  root.get("x") # 100
  root.get("z.m") # 0.5
  
  root.get("xx") # None
  root.get("z.mm") # None

  root.is_exist("x") # True
  root.is_exist("xx") # False
  root.is_exist("z.m") # True
  root.is_exist("z.mm") # False

  obj_n = root.get("z.n")
  obj_n.get("j") # True

  obj_z.get_field_names() # ["m", "n"]
  obj_z.get_fields() # {"m" : 0.5, "n" : MemoryObject}
```

GitHub link: https://github.com/apache/flink-agents/discussions/40

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to