This is an automated email from the ASF dual-hosted git repository.
chenliang613 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 0aa45ae53e fix contextstore issues
0aa45ae53e is described below
commit 0aa45ae53e3732eeda436a80e18649b265520727
Author: chenliang613 <[email protected]>
AuthorDate: Sat Feb 21 16:49:04 2026 +0800
fix contextstore issues
---
Agent_module/ContextStore.py | 722 ++++++++++++++++++++++++++++++++++++++
Agent_module/ContextStore_Demo.py | 274 +++++++++++++++
2 files changed, 996 insertions(+)
diff --git a/Agent_module/ContextStore.py b/Agent_module/ContextStore.py
new file mode 100644
index 0000000000..34e5d56442
--- /dev/null
+++ b/Agent_module/ContextStore.py
@@ -0,0 +1,722 @@
+"""
+ContextStore.py
+===============
+Unified, thread-safe, async-first context exchange for multi-agent systems.
+
+Replaces SharedContext + ContextBridge with a single module that provides:
+ - Keyed state store with full write history (last 50 writes per key)
+ - Point-to-point FIFO message channels between agents
+ - Pub/sub notifications supporting both sync and async callbacks
+ - TTL, tagging, querying, merge, snapshot, and serialisation
+
+Usage:
+ store = ContextStore()
+
+ # State store
+ rec = await store.put("task", {"id": 1}, writer="coordinator")
+ val = await store.get("task")
+
+ # Channels
+ await store.send("coordinator", "worker", {"cmd": "run"})
+ msg = await store.receive("coordinator", "worker")
+
+ # Pub/sub
+ store.subscribe("monitor", my_callback)
+
+ # Namespace-scoped view
+ ns = ContextNamespace(store, "pipeline_v1")
+ await ns.put("status", "running", writer="coordinator")
+"""
+
+from __future__ import annotations
+
+import asyncio
+import copy
+import json
+import uuid
+from collections import deque
+from dataclasses import dataclass, field
+from datetime import datetime
+from typing import Any, Callable, Dict, List, Optional, Set, Tuple
+
+
+# ============================================================================
+# Data Models
+# ============================================================================
+
+
+@dataclass(frozen=True)
+class ContextRecord:
+ """
+ Immutable record of a single write.
+
+ Every put() call appends a new ContextRecord to the key's history.
+ Frozen so the record is safe to share across tasks without copying.
+ """
+
+ key: str
+ value: Any
+ writer: str
+ version: int
+ timestamp: str
+ record_id: str
+ ttl: Optional[int] # seconds until expiry; None = never expires
+ tags: Tuple[str, ...]
+
+ def is_expired(self) -> bool:
+ """Return True if this record has passed its TTL."""
+ if self.ttl is None:
+ return False
+ written_at = datetime.fromisoformat(self.timestamp)
+ return (datetime.now() - written_at).total_seconds() > self.ttl
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ "key": self.key,
+ "value": self.value,
+ "writer": self.writer,
+ "version": self.version,
+ "timestamp": self.timestamp,
+ "record_id": self.record_id,
+ "ttl": self.ttl,
+ "tags": list(self.tags),
+ }
+
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> "ContextRecord":
+ return cls(
+ key=data["key"],
+ value=data["value"],
+ writer=data["writer"],
+ version=data["version"],
+ timestamp=data["timestamp"],
+ record_id=data["record_id"],
+ ttl=data.get("ttl"),
+ tags=tuple(data.get("tags", [])),
+ )
+
+
+@dataclass
+class ContextEntry:
+ """
+ Active state for one key: current record + full write history.
+
+ history is a deque with maxlen=50; oldest records are dropped
+ automatically when the limit is reached.
+ """
+
+ current: ContextRecord
+ history: deque = field(default_factory=lambda: deque(maxlen=50))
+
+ def is_expired(self) -> bool:
+ return self.current.is_expired()
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ "current": self.current.to_dict(),
+ "history": [r.to_dict() for r in self.history],
+ }
+
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> "ContextEntry":
+ current = ContextRecord.from_dict(data["current"])
+ history: deque = deque(
+ [ContextRecord.from_dict(r) for r in data.get("history", [])],
+ maxlen=50,
+ )
+ return cls(current=current, history=history)
+
+
+@dataclass
+class ContextMessage:
+ """Directed message from one agent to another."""
+
+ sender: str
+ receiver: str
+ payload: Any
+ timestamp: str
+ message_id: str
+ tags: Tuple[str, ...]
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ "sender": self.sender,
+ "receiver": self.receiver,
+ "payload": self.payload,
+ "timestamp": self.timestamp,
+ "message_id": self.message_id,
+ "tags": list(self.tags),
+ }
+
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> "ContextMessage":
+ return cls(
+ sender=data["sender"],
+ receiver=data["receiver"],
+ payload=data["payload"],
+ timestamp=data["timestamp"],
+ message_id=data["message_id"],
+ tags=tuple(data.get("tags", [])),
+ )
+
+
+# ============================================================================
+# ContextChannel — point-to-point FIFO queue
+# ============================================================================
+
+
+class ContextChannel:
+ """FIFO message queue from one specific agent to another."""
+
+ def __init__(self) -> None:
+ self._queue: deque[ContextMessage] = deque()
+ self._lock = asyncio.Lock()
+
+ async def send(self, message: ContextMessage) -> None:
+ async with self._lock:
+ self._queue.append(message)
+
+ async def receive(self) -> Optional[ContextMessage]:
+ """Pop and return the oldest message, or None if empty."""
+ async with self._lock:
+ return self._queue.popleft() if self._queue else None
+
+ async def receive_all(self) -> List[ContextMessage]:
+ """Pop and return all pending messages in FIFO order."""
+ async with self._lock:
+ msgs = list(self._queue)
+ self._queue.clear()
+ return msgs
+
+ async def peek(self) -> Optional[ContextMessage]:
+ """Return the oldest message without removing it."""
+ async with self._lock:
+ return self._queue[0] if self._queue else None
+
+ def __len__(self) -> int:
+ return len(self._queue)
+
+
+# ============================================================================
+# Subscription
+# ============================================================================
+
+
+@dataclass
+class Subscription:
+ """Internal pub/sub record."""
+
+ agent_id: str
+ namespace: str
+ keys: Set[str] # empty set = subscribe to ALL keys in namespace
+ callback: Callable # sync or async; called as callback(key, record)
+
+
+# ============================================================================
+# ContextStore — core
+# ============================================================================
+
+
+class ContextStore:
+ """
+ Unified, thread-safe, async-first context exchange for multi-agent systems.
+
+ Internal layout
+ ---------------
+ _entries: Dict[namespace, Dict[key, ContextEntry]] — asyncio.Lock
protected
+ _channels: Dict[(sender, receiver), ContextChannel] — each has its own
lock
+ _subs: Dict[namespace, Dict[agent_id, Subscription]]
+ _lock: asyncio.Lock (single global lock protecting _entries)
+
+ Notification contract
+ ---------------------
+ - Callbacks fire AFTER the lock is released to avoid re-entrant deadlocks.
+ - Writers never receive their own notifications.
+ - Both sync and async callbacks are supported.
+
+ TTL eviction
+ ------------
+ - Lazy eviction on get_entry() reads.
+ - Batch eviction via cleanup_expired().
+ """
+
+ def __init__(self) -> None:
+ self._entries: Dict[str, Dict[str, ContextEntry]] = {}
+ self._channels: Dict[Tuple[str, str], ContextChannel] = {}
+ self._subs: Dict[str, Dict[str, Subscription]] = {}
+ self._lock = asyncio.Lock()
+
+ # -----------------------------------------------------------------------
+ # Internal helpers
+ # -----------------------------------------------------------------------
+
+ def _ns_entries(self, namespace: str) -> Dict[str, ContextEntry]:
+ """Return (creating if needed) the entries dict for *namespace*."""
+ if namespace not in self._entries:
+ self._entries[namespace] = {}
+ return self._entries[namespace]
+
+ def _ns_subs(self, namespace: str) -> Dict[str, Subscription]:
+ if namespace not in self._subs:
+ self._subs[namespace] = {}
+ return self._subs[namespace]
+
+ # -----------------------------------------------------------------------
+ # State API — writes
+ # -----------------------------------------------------------------------
+
+ async def put(
+ self,
+ key: str,
+ value: Any,
+ writer: str,
+ namespace: str = "default",
+ ttl: Optional[int] = None,
+ tags: Optional[List[str]] = None,
+ ) -> ContextRecord:
+ """
+ Write *key* → *value* on behalf of *writer*.
+
+ Creates a new ContextRecord and appends it to the key's history.
+ Notifies subscribers after the lock is released.
+ Returns the new ContextRecord.
+ """
+ to_notify: List[Subscription] = []
+ record: ContextRecord
+
+ async with self._lock:
+ entries = self._ns_entries(namespace)
+ existing = entries.get(key)
+ version = (existing.current.version + 1) if existing else 1
+
+ record = ContextRecord(
+ key=key,
+ value=value,
+ writer=writer,
+ version=version,
+ timestamp=datetime.now().isoformat(),
+ record_id=str(uuid.uuid4()),
+ ttl=ttl,
+ tags=tuple(tags or []),
+ )
+
+ if existing is None:
+ entry = ContextEntry(current=record)
+ entry.history.append(record)
+ entries[key] = entry
+ else:
+ existing.history.append(record)
+ existing.current = record
+
+ # Collect subscribers before releasing the lock
+ for sub in self._ns_subs(namespace).values():
+ if sub.agent_id == writer:
+ continue # no self-notification
+ if sub.keys and key not in sub.keys:
+ continue
+ to_notify.append(sub)
+
+ # Fire callbacks AFTER the lock is released
+ for sub in to_notify:
+ try:
+ result = sub.callback(key, record)
+ if asyncio.iscoroutine(result):
+ await result
+ except Exception:
+ pass # bad callbacks must not crash the store
+
+ return record
+
+ # -----------------------------------------------------------------------
+ # State API — reads
+ # -----------------------------------------------------------------------
+
+ async def get(self, key: str, namespace: str = "default") -> Optional[Any]:
+ """Return current value for *key*, or None if missing or
TTL-expired."""
+ entry = await self.get_entry(key, namespace)
+ return entry.current.value if entry else None
+
+ async def get_entry(
+ self, key: str, namespace: str = "default"
+ ) -> Optional[ContextEntry]:
+ """
+ Return the full ContextEntry (current + history) for *key*.
+
+ Lazily evicts the entry if its TTL has expired.
+ Returns None if missing or expired.
+ """
+ async with self._lock:
+ entries = self._ns_entries(namespace)
+ entry = entries.get(key)
+ if entry is None:
+ return None
+ if entry.is_expired():
+ del entries[key]
+ return None
+ return entry
+
+ async def delete(
+ self, key: str, writer: str, namespace: str = "default"
+ ) -> bool:
+ """Remove *key* from the store. Returns True if it existed."""
+ async with self._lock:
+ entries = self._ns_entries(namespace)
+ if key in entries:
+ del entries[key]
+ return True
+ return False
+
+ async def history(
+ self,
+ key: str,
+ namespace: str = "default",
+ limit: Optional[int] = None,
+ ) -> List[ContextRecord]:
+ """
+ Return write history for *key*, newest-first.
+
+ Returns at most *limit* records if specified.
+ Returns [] if the key does not exist or is expired.
+ """
+ entry = await self.get_entry(key, namespace)
+ if entry is None:
+ return []
+ records = list(reversed(entry.history))
+ return records[:limit] if limit is not None else records
+
+ async def keys(self, namespace: str = "default") -> List[str]:
+ """Return all non-expired keys in *namespace*."""
+ await self.cleanup_expired(namespace)
+ async with self._lock:
+ return list(self._ns_entries(namespace).keys())
+
+ async def query(
+ self,
+ namespace: str = "default",
+ *,
+ writer: Optional[str] = None,
+ tags: Optional[List[str]] = None,
+ since: Optional[str] = None, # ISO-8601 datetime string (inclusive)
+ until: Optional[str] = None, # ISO-8601 datetime string (inclusive)
+ ) -> List[ContextEntry]:
+ """
+ Return ContextEntry objects whose current record matches all filters.
+
+ writer — only entries last written by this agent
+ tags — entry must have ALL of these tags
+ since — entry timestamp >= since
+ until — entry timestamp <= until
+ """
+ await self.cleanup_expired(namespace)
+ tag_set = set(tags) if tags else None
+
+ async with self._lock:
+ entries = self._ns_entries(namespace)
+ results: List[ContextEntry] = []
+ for entry in entries.values():
+ rec = entry.current
+ if writer and rec.writer != writer:
+ continue
+ if tag_set and not tag_set.issubset(set(rec.tags)):
+ continue
+ if since and rec.timestamp < since:
+ continue
+ if until and rec.timestamp > until:
+ continue
+ results.append(entry)
+ return results
+
+ # -----------------------------------------------------------------------
+ # Channel API — point-to-point messaging
+ # -----------------------------------------------------------------------
+
+ def channel(self, sender: str, receiver: str) -> ContextChannel:
+ """Return (creating if needed) the channel from *sender* to
*receiver*."""
+ key = (sender, receiver)
+ if key not in self._channels:
+ self._channels[key] = ContextChannel()
+ return self._channels[key]
+
+ async def send(
+ self,
+ sender: str,
+ receiver: str,
+ payload: Any,
+ tags: Optional[List[str]] = None,
+ ) -> ContextMessage:
+ """Send a directed message from *sender* to *receiver*."""
+ msg = ContextMessage(
+ sender=sender,
+ receiver=receiver,
+ payload=payload,
+ timestamp=datetime.now().isoformat(),
+ message_id=str(uuid.uuid4()),
+ tags=tuple(tags or []),
+ )
+ await self.channel(sender, receiver).send(msg)
+ return msg
+
+ async def receive(
+ self, sender: str, receiver: str
+ ) -> Optional[ContextMessage]:
+ """Pop the next message from the *sender* → *receiver* channel."""
+ return await self.channel(sender, receiver).receive()
+
+ async def inbox(self, receiver: str) -> List[ContextMessage]:
+ """
+ Collect and return all pending messages for *receiver* across every
+ sender channel, sorted by timestamp (oldest first).
+
+ This is a destructive read — messages are removed from the channel.
+ """
+ msgs: List[ContextMessage] = []
+ for (s, r), ch in self._channels.items():
+ if r == receiver:
+ msgs.extend(await ch.receive_all())
+ msgs.sort(key=lambda m: m.timestamp)
+ return msgs
+
+ # -----------------------------------------------------------------------
+ # Pub/Sub API
+ # -----------------------------------------------------------------------
+
+ def subscribe(
+ self,
+ agent_id: str,
+ callback: Callable,
+ namespace: str = "default",
+ keys: Optional[List[str]] = None,
+ ) -> None:
+ """
+ Register *agent_id* to be notified on writes to *keys*.
+
+ keys=None (or []) means subscribe to ALL keys in the namespace.
+ callback(key, record) is called after each put(); both sync and
+ async callbacks are supported.
+ """
+ subs = self._ns_subs(namespace)
+ subs[agent_id] = Subscription(
+ agent_id=agent_id,
+ namespace=namespace,
+ keys=set(keys) if keys else set(),
+ callback=callback,
+ )
+
+ def unsubscribe(self, agent_id: str, namespace: str = "default") -> None:
+ """Remove *agent_id*'s subscription in *namespace*."""
+ self._ns_subs(namespace).pop(agent_id, None)
+
+ # -----------------------------------------------------------------------
+ # Maintenance
+ # -----------------------------------------------------------------------
+
+ async def cleanup_expired(self, namespace: str = "default") -> int:
+ """
+ Remove all TTL-expired entries from *namespace*.
+ Returns the number of keys removed.
+ """
+ async with self._lock:
+ entries = self._ns_entries(namespace)
+ expired = [k for k, e in entries.items() if e.is_expired()]
+ for k in expired:
+ del entries[k]
+ return len(expired)
+
+ async def snapshot(self, namespace: str = "default") -> Dict[str, Any]:
+ """
+ Return a deep copy of current non-expired entries in *namespace*
+ as a plain dictionary (key → entry dict).
+ """
+ await self.cleanup_expired(namespace)
+ async with self._lock:
+ entries = self._ns_entries(namespace)
+ return copy.deepcopy({k: e.to_dict() for k, e in entries.items()})
+
+ async def merge(
+ self, other: "ContextStore", namespace: str = "default"
+ ) -> int:
+ """
+ Merge entries from *other* into this store for *namespace*.
+
+ Uses last-write-wins by version number: an entry from *other* is
+ accepted only if its current version is higher than the local one.
+ Returns the number of entries merged (updated or inserted).
+ """
+ # Copy other's entries while holding *its* lock (avoids concurrent
mutation)
+ async with other._lock:
+ other_snapshot = copy.deepcopy(
+ dict(other._entries.get(namespace, {}))
+ )
+
+ merged = 0
+ async with self._lock:
+ entries = self._ns_entries(namespace)
+ for key, other_entry in other_snapshot.items():
+ existing = entries.get(key)
+ if (
+ existing is None
+ or other_entry.current.version > existing.current.version
+ ):
+ entries[key] = other_entry
+ merged += 1
+ return merged
+
+ async def stats(self) -> Dict[str, Any]:
+ """Return high-level statistics about the store."""
+ async with self._lock:
+ ns_stats = {}
+ for ns, entries in self._entries.items():
+ subs = self._subs.get(ns, {})
+ ns_stats[ns] = {
+ "total_keys": len(entries),
+ "expired_keys": sum(
+ 1 for e in entries.values() if e.is_expired()
+ ),
+ "subscribers": list(subs.keys()),
+ }
+
+ channel_stats = {
+ f"{s}->{r}": len(ch)
+ for (s, r), ch in self._channels.items()
+ }
+
+ return {
+ "namespaces": ns_stats,
+ "channels": channel_stats,
+ "total_channels": len(self._channels),
+ }
+
+ async def to_dict(self) -> Dict[str, Any]:
+ """Full serialisation of the store to a plain Python dict."""
+ # Snapshot all namespaces, filtering out expired entries in one lock
pass
+ async with self._lock:
+ entries_dict: Dict[str, Any] = {}
+ for ns, entries in self._entries.items():
+ entries_dict[ns] = {
+ k: e.to_dict()
+ for k, e in entries.items()
+ if not e.is_expired()
+ }
+
+ # Snapshot channels (each under its own lock)
+ channels_dict: Dict[str, Any] = {}
+ for (s, r), ch in self._channels.items():
+ async with ch._lock:
+ channels_dict[f"{s}->{r}"] = [m.to_dict() for m in ch._queue]
+
+ subs_dict: Dict[str, Any] = {
+ ns: {
+ aid: {
+ "agent_id": sub.agent_id,
+ "namespace": sub.namespace,
+ "keys": list(sub.keys),
+ }
+ for aid, sub in subs.items()
+ }
+ for ns, subs in self._subs.items()
+ }
+
+ return {
+ "entries": entries_dict,
+ "channels": channels_dict,
+ "subscriptions": subs_dict,
+ }
+
+ async def to_json(self, indent: int = 2) -> str:
+ """Return a JSON string of the full store."""
+ return json.dumps(await self.to_dict(), indent=indent,
ensure_ascii=False)
+
+
+# ============================================================================
+# ContextNamespace — namespace-scoped proxy view
+# ============================================================================
+
+
+class ContextNamespace:
+ """
+ A proxy view of ContextStore that fixes the *namespace* parameter.
+
+ All methods have the same signatures as ContextStore but without needing
+ to specify *namespace* on every call.
+
+ Example:
+ store = ContextStore()
+ ns = ContextNamespace(store, "pipeline_v1")
+ await ns.put("status", "running", writer="coordinator")
+ val = await ns.get("status")
+ """
+
+ def __init__(self, store: ContextStore, namespace: str) -> None:
+ self._store = store
+ self._namespace = namespace
+
+ @property
+ def namespace(self) -> str:
+ return self._namespace
+
+ # State API
+
+ async def put(
+ self,
+ key: str,
+ value: Any,
+ writer: str,
+ ttl: Optional[int] = None,
+ tags: Optional[List[str]] = None,
+ ) -> ContextRecord:
+ return await self._store.put(
+ key, value, writer, self._namespace, ttl, tags
+ )
+
+ async def get(self, key: str) -> Optional[Any]:
+ return await self._store.get(key, self._namespace)
+
+ async def get_entry(self, key: str) -> Optional[ContextEntry]:
+ return await self._store.get_entry(key, self._namespace)
+
+ async def delete(self, key: str, writer: str) -> bool:
+ return await self._store.delete(key, writer, self._namespace)
+
+ async def history(
+ self, key: str, limit: Optional[int] = None
+ ) -> List[ContextRecord]:
+ return await self._store.history(key, self._namespace, limit)
+
+ async def keys(self) -> List[str]:
+ return await self._store.keys(self._namespace)
+
+ async def query(
+ self,
+ *,
+ writer: Optional[str] = None,
+ tags: Optional[List[str]] = None,
+ since: Optional[str] = None,
+ until: Optional[str] = None,
+ ) -> List[ContextEntry]:
+ return await self._store.query(
+ self._namespace, writer=writer, tags=tags, since=since, until=until
+ )
+
+ # Pub/Sub API
+
+ def subscribe(
+ self,
+ agent_id: str,
+ callback: Callable,
+ keys: Optional[List[str]] = None,
+ ) -> None:
+ self._store.subscribe(agent_id, callback, self._namespace, keys)
+
+ def unsubscribe(self, agent_id: str) -> None:
+ self._store.unsubscribe(agent_id, self._namespace)
+
+ # Maintenance / serialisation
+
+ async def cleanup_expired(self) -> int:
+ return await self._store.cleanup_expired(self._namespace)
+
+ async def snapshot(self) -> Dict[str, Any]:
+ return await self._store.snapshot(self._namespace)
+
+ async def merge(self, other: ContextStore) -> int:
+ return await self._store.merge(other, self._namespace)
diff --git a/Agent_module/ContextStore_Demo.py
b/Agent_module/ContextStore_Demo.py
new file mode 100644
index 0000000000..5552f839da
--- /dev/null
+++ b/Agent_module/ContextStore_Demo.py
@@ -0,0 +1,274 @@
+"""
+ContextStore_Demo.py
+====================
+End-to-end demonstration of the ContextStore multi-agent data exchange system.
+
+Scenarios covered
+-----------------
+ 1. coordinator writes task → worker reads and processes → monitor observes
+ changes via pub/sub async callback
+ 2. history() returns all write versions (newest-first)
+ 3. Directed channel: coordinator → worker messages; inbox() aggregates all
+ 4. TTL entry written, expires after 3 seconds, get() returns None
+ 5. query(writer="worker") returns only entries written by worker
+ 6. merge() correctly resolves version conflicts (higher version wins)
+ 7. to_json() fully serialises the entire store
+
+Run:
+ python3 Agent_module/ContextStore_Demo.py
+"""
+
+import asyncio
+import json
+import sys
+import os
+
+# ---------------------------------------------------------------------------
+# Load ContextStore from the same directory (supports .py extension)
+# ---------------------------------------------------------------------------
+sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
+from ContextStore import (
+ ContextStore,
+ ContextNamespace,
+ ContextRecord,
+)
+
+
+# ============================================================================
+# Helpers
+# ============================================================================
+
+
+def section(title: str) -> None:
+ print("\n" + "=" * 70)
+ print(f" {title}")
+ print("=" * 70)
+
+
+# ============================================================================
+# Shared observation log for the async monitor callback
+# ============================================================================
+
+_change_log: list = []
+
+
+async def monitor_callback(key: str, record: ContextRecord) -> None:
+ """Async pub/sub callback — called after every put() that monitor
watches."""
+ msg = (
+ f" [MONITOR] key={key!r:20s} "
+ f"writer={record.writer:15s} "
+ f"v{record.version} "
+ f"value={str(record.value)!r:.55s}"
+ )
+ print(msg)
+ _change_log.append(
+ {"key": key, "writer": record.writer, "value": record.value}
+ )
+
+
+# ============================================================================
+# Main demo
+# ============================================================================
+
+
+async def main() -> None:
+
+ store = ContextStore()
+ ns = ContextNamespace(store, "data_pipeline_v1")
+
+ # -----------------------------------------------------------------------
+ # 1. Setup — Subscribe monitor, then coordinator assigns task
+ # -----------------------------------------------------------------------
+ section("1. Setup + coordinator assigns task [pub/sub demo]")
+
+ # Monitor subscribes to ALL keys in the namespace (async callback)
+ ns.subscribe("monitor", monitor_callback)
+
+ # Coordinator writes task and initial status
+ await ns.put("task", {"id": "task_001", "description": "Analyse Q1 sales",
"priority": "high"}, writer="coordinator")
+ await ns.put("status", "pending", writer="coordinator")
+ await ns.put("deadline", "2026-02-18T18:00:00", writer="coordinator",
tags=["meta"])
+
+ # Worker reads task, updates status, writes result
+ task = await ns.get("task")
+ print(f"\n Worker received task: {task}")
+
+ await ns.put("status", "in_progress", writer="worker")
+ print(" Worker processing …")
+ await asyncio.sleep(0.05)
+
+ await ns.put(
+ "result",
+ {"task_id": task["id"], "summary": "Q1 revenue +12% YoY; top SKU:
widget-X", "rows_processed": 14_320},
+ writer="worker",
+ tags=["output"],
+ )
+ await ns.put("status", "completed", writer="worker")
+
+ print(f"\n Final status: {await ns.get('status')!r}")
+ print(f" Result: {await ns.get('result')}")
+
+ # -----------------------------------------------------------------------
+ # 2. history() — newest-first
+ # -----------------------------------------------------------------------
+ section("2. history('status') — all write versions, newest-first")
+
+ records = await ns.history("status")
+ for r in records:
+ print(f" v{r.version} writer={r.writer:15s} value={r.value!r}
ts={r.timestamp}")
+
+ assert records[0].value == "completed", "Newest record should be
'completed'"
+ assert records[-1].value == "pending", "Oldest record should be 'pending'"
+ print(f"\n Total writes to 'status': {len(records)} ✓")
+
+ # -----------------------------------------------------------------------
+ # 3. Directed channels + inbox()
+ # -----------------------------------------------------------------------
+ section("3. Directed channels: coordinator → worker, inbox() aggregation")
+
+ await store.send("coordinator", "worker", {"cmd": "run_report", "format":
"pdf"}, tags=["cmd"])
+ await store.send("coordinator", "worker", {"cmd": "upload_results",
"dest": "s3://bucket/q1"}, tags=["cmd"])
+ await store.send("monitor", "worker", {"cmd": "health_check"},
tags=["admin"])
+
+ ch_len = len(store.channel("coordinator", "worker"))
+ print(f" coordinator→worker queue depth: {ch_len}")
+
+ # peek without consuming
+ peeked = await store.channel("coordinator", "worker").peek()
+ print(f" Peeked first message: {peeked.payload}")
+
+ # inbox() collects from ALL senders and sorts by time
+ inbox_msgs = await store.inbox("worker")
+ print(f"\n inbox('worker') received {len(inbox_msgs)} message(s):")
+ for m in inbox_msgs:
+ print(f" from={m.sender:15s} payload={m.payload}")
+
+ assert len(inbox_msgs) == 3, "Worker should have 3 messages in inbox"
+ print(" inbox() aggregation ✓")
+
+ # -----------------------------------------------------------------------
+ # 4. TTL — entry expires after 3 seconds
+ # -----------------------------------------------------------------------
+ section("4. TTL — heartbeat entry expires after 3 seconds")
+
+ await ns.put(
+ "heartbeat",
+ {"alive": True, "load": 0.42},
+ writer="worker",
+ ttl=3,
+ tags=["health"],
+ )
+ immediate = await ns.get("heartbeat")
+ print(f" Immediate read → {immediate}")
+ assert immediate is not None, "Heartbeat should be readable immediately"
+
+ print(" Sleeping 3.5 s for TTL expiry …")
+ await asyncio.sleep(3.5)
+
+ expired_val = await ns.get("heartbeat")
+ print(f" After 3.5 s read → {expired_val} (None = expired correctly)")
+ assert expired_val is None, "Heartbeat should have expired"
+
+ removed = await ns.cleanup_expired()
+ print(f" cleanup_expired() removed {removed} additional stale key(s) ✓")
+
+ # -----------------------------------------------------------------------
+ # 5. query(writer="worker") — filter by writer
+ # -----------------------------------------------------------------------
+ section("5. query(writer='worker') — only worker-written entries")
+
+ worker_entries = await ns.query(writer="worker")
+ print(f" Entries written by worker ({len(worker_entries)}):")
+ for e in worker_entries:
+ print(f" key={e.current.key!r:15s} v{e.current.version}
tags={list(e.current.tags)}")
+
+ for e in worker_entries:
+ assert e.current.writer == "worker", f"Expected writer='worker', got
{e.current.writer!r}"
+ print(" All returned entries are from worker ✓")
+
+ # Also verify coordinator entries are NOT in the result
+ coordinator_entries = await ns.query(writer="coordinator")
+ print(f"\n Entries written by coordinator ({len(coordinator_entries)}):")
+ for e in coordinator_entries:
+ print(f" key={e.current.key!r:15s} v{e.current.version}")
+
+ # -----------------------------------------------------------------------
+ # 6. merge() — version conflict resolution
+ # -----------------------------------------------------------------------
+ section("6. merge() — last-write-wins by version number")
+
+ remote_store = ContextStore()
+
+ # Remote has a HIGHER version of "status" → should win
+ # We manually set version by writing multiple times
+ await remote_store.put("status", "archived",
writer="archiver", namespace="data_pipeline_v1")
+ await remote_store.put("status", "archived_v2",
writer="archiver", namespace="data_pipeline_v1")
+ await remote_store.put("archived_at", "2026-02-18T19:00:00",
writer="archiver", namespace="data_pipeline_v1")
+
+ remote_status_v = (await remote_store.get_entry("status",
"data_pipeline_v1")).current.version
+ local_status_v = (await store.get_entry("status",
"data_pipeline_v1")).current.version
+ print(f" Before merge: local status v{local_status_v}, remote status
v{remote_status_v}")
+
+ merged_count = await store.merge(remote_store, "data_pipeline_v1")
+ print(f" merge() accepted {merged_count} entry/entries from remote")
+
+ merged_status = await store.get("status", "data_pipeline_v1")
+ print(f" status after merge → {merged_status!r}")
+
+ if remote_status_v > local_status_v:
+ assert merged_status == "archived_v2", "Higher-version remote value
should have won"
+ print(" Higher remote version won ✓")
+ else:
+ print(" Local version was already higher; remote was ignored ✓")
+
+ # -----------------------------------------------------------------------
+ # 7. to_json() — full serialisation
+ # -----------------------------------------------------------------------
+ section("7. to_json() — full store serialisation")
+
+ json_str = await store.to_json()
+ parsed = json.loads(json_str)
+
+ print(f" Serialised namespaces: {list(parsed['entries'].keys())}")
+ print(f" Channels captured: {list(parsed['channels'].keys())}")
+ print(f" Subscriptions: {list(parsed['subscriptions'].keys())}")
+
+ # Verify roundtrip: all namespace keys appear in JSON
+ local_keys = set(await ns.keys())
+ json_keys = set(parsed["entries"].get("data_pipeline_v1", {}).keys())
+ assert local_keys == json_keys, f"JSON keys mismatch: {local_keys} vs
{json_keys}"
+ print(" Key set roundtrip matches ✓")
+
+ # Print a condensed preview
+ condensed = {
+ "entries_count": {
+ ns_name: len(keys)
+ for ns_name, keys in parsed["entries"].items()
+ },
+ "channels": parsed["channels"],
+ }
+ print("\n Condensed summary:")
+ print(json.dumps(condensed, indent=4, ensure_ascii=False))
+
+ # -----------------------------------------------------------------------
+ # 8. Final stats
+ # -----------------------------------------------------------------------
+ section("8. Store stats")
+
+ stats = await store.stats()
+ print(json.dumps(stats, indent=2, ensure_ascii=False))
+
+ # -----------------------------------------------------------------------
+ # 9. Monitor change log summary
+ # -----------------------------------------------------------------------
+ section("9. Monitor pub/sub change log summary")
+
+ print(f" Total changes observed by monitor: {len(_change_log)}")
+ for i, entry in enumerate(_change_log, 1):
+ print(f" {i:2d}. [{entry['writer']:15s}] {entry['key']!r}")
+
+ section("Demo complete — all scenarios passed ✓")
+
+
+if __name__ == "__main__":
+ asyncio.run(main())