joeyutong opened a new issue, #723: URL: https://github.com/apache/flink-agents/issues/723
### Search before asking - [x] I searched in the [issues](https://github.com/apache/flink-agents/issues) and found nothing similar. ### Description Python-side memory values can currently accept objects that are not stable across the Pemja / Flink state backend boundary. When such values are written to sensory memory or short-term memory and then checkpointed/restored, the restored Java state may contain `pemja.core.object.PyObject` / `PyJObject` wrappers rather than a materialized, process-independent payload. After a TaskManager / Python process restart, converting those restored objects back to Python can fail or crash in Pemja native conversion. There are two related user-facing problems: 1. The memory documentation does not clearly define the Python-side value contract. The docs mention primitive types, collections, Java POJOs, general class types, and `MemoryObject`, but do not say whether Python values must be recursively JSON-like / checkpoint-stable. For example, it is unclear whether `uuid.UUID`, Pydantic models, custom Python classes, or a `dict` containing those objects are valid memory values. 2. The built-in tool context path writes `UUID` objects into sensory memory. In `python/flink_agents/plan/actions/chat_model_action.py`, tool call context uses `initial_request_id: UUID` / `tool_request_event_id: UUID` in dictionaries stored in `ctx.sensory_memory`. For example: - `_update_tool_call_context(...)` stores a dict keyed by `initial_request_id`. - `_save_tool_request_event_context(...)` stores `"initial_request_id": initial_request_id` and stores the context under `tool_request_event_id`. With Pemja, `uuid.UUID` does not have a dedicated Python-to-Java conversion branch, so it falls back to a `PyObject` wrapper. That makes the built-in tool-call flow capable of storing a non-checkpoint-stable payload in Flink state. Expected behavior: - The Python memory value contract should be explicit and enforceable. - Built-in runtime memory payloads should not store raw Python objects such as `uuid.UUID`. - Values stored in Flink-backed memory should be safe to checkpoint and restore across TaskManager / Python process restarts. A reasonable Python-side contract could be to accept only recursively checkpoint-stable values such as: ```python None | bool | int | float | str | bytes | list[MemoryValue] | dict[str, MemoryValue] ``` Pydantic values should be materialized before storing, for example: ```python ctx.sensory_memory.set("x", model.model_dump(mode="json")) ``` or: ```python ctx.sensory_memory.set("x", json.loads(model.model_dump_json())) ``` `uuid.UUID` should be stored as `str(uuid_value)` if it needs to be persisted. ### How to reproduce Use a Python Flink Agents streaming job with a real Flink state backend and checkpointing enabled. The important part is to test a real Flink Agent job path, not only direct Flink serializer behavior. 1. Configure a Flink Agent job with keyed state, RocksDB state backend, filesystem checkpointing, and checkpointing enabled. 2. In one action, write a Python dict containing a `uuid.UUID` to sensory memory: ```python from uuid import UUID ctx.sensory_memory.set( "restore.payload", { "request_id": UUID("00000000-0000-0000-0000-000000000001"), "metadata": {"input_id": 1}, }, ) ``` 3. Trigger a follow-up action in the same agent run. 4. Wait until a checkpoint containing the memory state completes. 5. Kill the TaskManager and start a replacement TaskManager so the job restores from the checkpoint. 6. In the follow-up action, read `ctx.sensory_memory.get("restore.payload")`. Observed behavior in a local repro: - RocksDB state restoration completes. - The restored action is re-executed after checkpoint restore. - When the restored memory value is converted back to Python, the JVM crashes in Pemja native code. Relevant log snippets: ```text Finished RocksDB incremental recovery in operator ActionExecutionOperator ... Python awaitable ref is null for action validate_after_tm_restart (likely restored from checkpoint), re-executing from beginning. SIGSEGV ... pemja_core.cpython-311-darwin.so ... JcpPyObject_FromJObject JcpPyDict_FromJMap ``` As a control test, storing a plain string in sensory memory through the same real Flink Agent + RocksDB + checkpoint + TaskManager restart path succeeds. The value written by the old TaskManager is read successfully by the new TaskManager after restore. This suggests the issue is not that sensory memory is never persisted. The issue is specifically around Python-origin objects that Pemja cannot materialize into checkpoint-stable Java values before they are stored in Flink state. ### Version and environment - Flink Agents: current main branch local checkout - Python: 3.11 - Pemja: 0.5.5 - Java: 11 - Flink/PyFlink: local mini-cluster using a PyFlink 2.2.0 distribution - State backend used for the local repro: RocksDB with filesystem checkpoint storage - Deployment mode: local standalone JobManager / TaskManager, manually killing and restarting the TaskManager to verify checkpoint restore behavior ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
