joeyutong opened a new issue, #723:
URL: https://github.com/apache/flink-agents/issues/723

   ### Search before asking
   
   - [x] I searched in the 
[issues](https://github.com/apache/flink-agents/issues) and found nothing 
similar.
   
   ### Description
   
   Python-side memory values can currently accept objects that are not stable 
across the Pemja / Flink state backend boundary. When such values are written 
to sensory memory or short-term memory and then checkpointed/restored, the 
restored Java state may contain `pemja.core.object.PyObject` / `PyJObject` 
wrappers rather than a materialized, process-independent payload. After a 
TaskManager / Python process restart, converting those restored objects back to 
Python can fail or crash in Pemja native conversion.
   
   There are two related user-facing problems:
   
   1. The memory documentation does not clearly define the Python-side value 
contract.
      The docs mention primitive types, collections, Java POJOs, general class 
types, and `MemoryObject`, but do not say whether Python values must be 
recursively JSON-like / checkpoint-stable. For example, it is unclear whether 
`uuid.UUID`, Pydantic models, custom Python classes, or a `dict` containing 
those objects are valid memory values.
   
   2. The built-in tool context path writes `UUID` objects into sensory memory.
      In `python/flink_agents/plan/actions/chat_model_action.py`, tool call 
context uses `initial_request_id: UUID` / `tool_request_event_id: UUID` in 
dictionaries stored in `ctx.sensory_memory`. For example:
      - `_update_tool_call_context(...)` stores a dict keyed by 
`initial_request_id`.
      - `_save_tool_request_event_context(...)` stores `"initial_request_id": 
initial_request_id` and stores the context under `tool_request_event_id`.
   
   With Pemja, `uuid.UUID` does not have a dedicated Python-to-Java conversion 
branch, so it falls back to a `PyObject` wrapper. That makes the built-in 
tool-call flow capable of storing a non-checkpoint-stable payload in Flink 
state.
   
   Expected behavior:
   
   - The Python memory value contract should be explicit and enforceable.
   - Built-in runtime memory payloads should not store raw Python objects such 
as `uuid.UUID`.
   - Values stored in Flink-backed memory should be safe to checkpoint and 
restore across TaskManager / Python process restarts.
   
   A reasonable Python-side contract could be to accept only recursively 
checkpoint-stable values such as:
   
   ```python
   None | bool | int | float | str | bytes | list[MemoryValue] | dict[str, 
MemoryValue]
   ```
   
   Pydantic values should be materialized before storing, for example:
   
   ```python
   ctx.sensory_memory.set("x", model.model_dump(mode="json"))
   ```
   
   or:
   
   ```python
   ctx.sensory_memory.set("x", json.loads(model.model_dump_json()))
   ```
   
   `uuid.UUID` should be stored as `str(uuid_value)` if it needs to be 
persisted.
   
   ### How to reproduce
   
   Use a Python Flink Agents streaming job with a real Flink state backend and 
checkpointing enabled. The important part is to test a real Flink Agent job 
path, not only direct Flink serializer behavior.
   
   1. Configure a Flink Agent job with keyed state, RocksDB state backend, 
filesystem checkpointing, and checkpointing enabled.
   2. In one action, write a Python dict containing a `uuid.UUID` to sensory 
memory:
   
   ```python
   from uuid import UUID
   
   ctx.sensory_memory.set(
       "restore.payload",
       {
           "request_id": UUID("00000000-0000-0000-0000-000000000001"),
           "metadata": {"input_id": 1},
       },
   )
   ```
   
   3. Trigger a follow-up action in the same agent run.
   4. Wait until a checkpoint containing the memory state completes.
   5. Kill the TaskManager and start a replacement TaskManager so the job 
restores from the checkpoint.
   6. In the follow-up action, read `ctx.sensory_memory.get("restore.payload")`.
   
   Observed behavior in a local repro:
   
   - RocksDB state restoration completes.
   - The restored action is re-executed after checkpoint restore.
   - When the restored memory value is converted back to Python, the JVM 
crashes in Pemja native code.
   
   Relevant log snippets:
   
   ```text
   Finished RocksDB incremental recovery in operator ActionExecutionOperator ...
   Python awaitable ref is null for action validate_after_tm_restart (likely 
restored from checkpoint), re-executing from beginning.
   SIGSEGV ... pemja_core.cpython-311-darwin.so ... JcpPyObject_FromJObject
   JcpPyDict_FromJMap
   ```
   
   As a control test, storing a plain string in sensory memory through the same 
real Flink Agent + RocksDB + checkpoint + TaskManager restart path succeeds. 
The value written by the old TaskManager is read successfully by the new 
TaskManager after restore.
   
   This suggests the issue is not that sensory memory is never persisted. The 
issue is specifically around Python-origin objects that Pemja cannot 
materialize into checkpoint-stable Java values before they are stored in Flink 
state.
   
   ### Version and environment
   
   - Flink Agents: current main branch local checkout
   - Python: 3.11
   - Pemja: 0.5.5
   - Java: 11
   - Flink/PyFlink: local mini-cluster using a PyFlink 2.2.0 distribution
   - State backend used for the local repro: RocksDB with filesystem checkpoint 
storage
   - Deployment mode: local standalone JobManager / TaskManager, manually 
killing and restarting the TaskManager to verify checkpoint restore behavior
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to