This is an automated email from the ASF dual-hosted git repository. chenliang613 pushed a commit to branch feature/shared-context in repository https://gitbox.apache.org/repos/asf/carbondata.git
commit f6c7656f839228ba7fe417b1bdb59c8bbf2d12f3 Author: chenliang <[email protected]> AuthorDate: Wed Feb 18 11:02:22 2026 +0800 Add shared context and pub/sub bridge for multi-agent collaboration Introduces SharedContext (namespace-scoped key/value store with TTL, versioning, merge, and snapshot) and ContextBridge (pub/sub layer that notifies subscribers on writes) so multiple agents can read, write, and react to shared state. SharedContext_Demo shows coordinator, worker, and monitor agents collaborating end-to-end. Co-Authored-By: Claude Sonnet 4.6 <[email protected]> --- Agent_module/ContextBridge.python | 152 +++++++++++++++++++++++ Agent_module/SharedContext.python | 216 ++++++++++++++++++++++++++++++++ Agent_module/SharedContext_Demo.python | 217 +++++++++++++++++++++++++++++++++ 3 files changed, 585 insertions(+) diff --git a/Agent_module/ContextBridge.python b/Agent_module/ContextBridge.python new file mode 100644 index 0000000000..9cb595a427 --- /dev/null +++ b/Agent_module/ContextBridge.python @@ -0,0 +1,152 @@ +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, List, Optional +from datetime import datetime + +# Load SharedContext from the same directory. +# .python extension requires importlib to load by file path. +import sys, os, importlib.util as _ilu + +def _load(name: str): + from importlib.machinery import SourceFileLoader + here = os.path.dirname(os.path.abspath(__file__)) + path = os.path.join(here, f"{name}.python") + loader = SourceFileLoader(name, path) + spec = _ilu.spec_from_loader(name, loader) + mod = _ilu.module_from_spec(spec) + sys.modules[name] = mod + loader.exec_module(mod) + return mod + +_sc = _load("SharedContext") +SharedContext = _sc.SharedContext +ContextEntry = _sc.ContextEntry + +# ============================================================================ +# Pub/Sub Bridge +# ============================================================================ + +@dataclass +class ContextSubscription: + """Subscription record: which agent watches which keys.""" + agent_id: str + keys: List[str] # empty list = subscribe to ALL keys + callback: Callable # called as callback(key, entry) on change + + +class ContextBridge: + """ + Connects agents to a SharedContext and fires callbacks on writes. + + Usage pattern: + bridge = ContextBridge(context) + bridge.register_agent("monitor", on_change) # subscribe to all + bridge.write("coordinator", "task", {"id": 1}) + """ + + def __init__(self, context: SharedContext): + self._context = context + self._subscriptions: Dict[str, ContextSubscription] = {} + self._access_log: List[Dict[str, Any]] = [] + + # ------------------------------------------------------------------ + # Agent registration + # ------------------------------------------------------------------ + + def register_agent( + self, + agent_id: str, + callback: Callable, + keys: Optional[List[str]] = None, + ) -> None: + """Subscribe *agent_id* to context changes on *keys* (all if empty).""" + self._subscriptions[agent_id] = ContextSubscription( + agent_id=agent_id, + keys=keys or [], + callback=callback, + ) + + def unregister_agent(self, agent_id: str) -> None: + """Remove the subscription for *agent_id*.""" + self._subscriptions.pop(agent_id, None) + + # ------------------------------------------------------------------ + # Writes + # ------------------------------------------------------------------ + + def write( + self, + agent_id: str, + key: str, + value: Any, + ttl: Optional[int] = None, + tags: Optional[List[str]] = None, + ) -> ContextEntry: + """Write *key* → *value* on behalf of *agent_id*, then notify.""" + entry = self._context.set(key, value, agent_id, ttl=ttl, tags=tags) + self._notify(key, entry) + return entry + + def broadcast( + self, + agent_id: str, + data: Dict[str, Any], + ttl: Optional[int] = None, + tags: Optional[List[str]] = None, + ) -> List[ContextEntry]: + """Write multiple keys atomically (no interleaved notifications).""" + written: List[ContextEntry] = [] + for key, value in data.items(): + entry = self._context.set(key, value, agent_id, ttl=ttl, tags=tags) + written.append(entry) + # Notify after all entries are committed + for entry in written: + self._notify(entry.key, entry) + return written + + # ------------------------------------------------------------------ + # Reads + # ------------------------------------------------------------------ + + def read(self, agent_id: str, key: str) -> Optional[Any]: + """Read *key* on behalf of *agent_id*; access is logged.""" + value = self._context.get(key) + self._log_access(agent_id, "read", key) + return value + + def read_all(self, agent_id: str) -> Dict[str, Any]: + """Return a snapshot of all non-expired entries visible to *agent_id*.""" + self._log_access(agent_id, "read_all", "*") + return self._context.snapshot() + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _notify(self, changed_key: str, entry: ContextEntry) -> None: + """Fire callbacks for every subscriber that watches *changed_key*.""" + for sub in self._subscriptions.values(): + # Skip the writer itself (no self-notification) + if sub.agent_id == entry.agent_id: + continue + if sub.keys and changed_key not in sub.keys: + continue + try: + sub.callback(changed_key, entry) + except Exception: + pass # Don't let a bad callback break the bridge + + def _log_access(self, agent_id: str, operation: str, key: str) -> None: + self._access_log.append({ + "agent_id": agent_id, + "operation": operation, + "key": key, + "timestamp": datetime.now().isoformat(), + }) + + # ------------------------------------------------------------------ + # Diagnostics + # ------------------------------------------------------------------ + + def access_log(self) -> List[Dict[str, Any]]: + """Return a copy of the access log.""" + return list(self._access_log) diff --git a/Agent_module/SharedContext.python b/Agent_module/SharedContext.python new file mode 100644 index 0000000000..e7eb96291f --- /dev/null +++ b/Agent_module/SharedContext.python @@ -0,0 +1,216 @@ +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional +from datetime import datetime +import copy +import json + +# ============================================================================ +# Core Data Models — Shared Context +# NOTE: SharedContext is NOT thread-safe by default. For concurrent access, +# callers must provide external locking (e.g., threading.Lock). +# ============================================================================ + +@dataclass +class ContextEntry: + """A single key/value entry written by an agent into the shared context.""" + key: str + value: Any + agent_id: str + timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) + version: int = 1 + ttl: Optional[int] = None # seconds until expiry; None = forever + tags: List[str] = field(default_factory=list) + + def is_expired(self) -> bool: + """Return True if this entry has passed its TTL.""" + if self.ttl is None: + return False + written_at = datetime.fromisoformat(self.timestamp) + elapsed = (datetime.now() - written_at).total_seconds() + return elapsed > self.ttl + + def to_dict(self) -> Dict[str, Any]: + return { + "key": self.key, + "value": self.value, + "agent_id": self.agent_id, + "timestamp": self.timestamp, + "version": self.version, + "ttl": self.ttl, + "tags": self.tags, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "ContextEntry": + return cls( + key=data["key"], + value=data["value"], + agent_id=data["agent_id"], + timestamp=data.get("timestamp", datetime.now().isoformat()), + version=data.get("version", 1), + ttl=data.get("ttl"), + tags=data.get("tags", []), + ) + + +@dataclass +class SharedContext: + """ + A namespace-scoped shared memory store for multiple agents. + + Agents read and write ContextEntry objects keyed by string. + Version numbers enable last-write-wins conflict resolution during merge. + + NOTE: Not thread-safe. Use external locking for concurrent access. + """ + namespace: str + entries: Dict[str, ContextEntry] = field(default_factory=dict) + version: int = 0 + created_at: str = field(default_factory=lambda: datetime.now().isoformat()) + updated_at: str = field(default_factory=lambda: datetime.now().isoformat()) + + # ------------------------------------------------------------------ + # Write + # ------------------------------------------------------------------ + + def set( + self, + key: str, + value: Any, + agent_id: str, + ttl: Optional[int] = None, + tags: Optional[List[str]] = None, + ) -> ContextEntry: + """Write a value. Bumps the store-level version counter.""" + existing = self.entries.get(key) + entry_version = (existing.version + 1) if existing else 1 + + entry = ContextEntry( + key=key, + value=value, + agent_id=agent_id, + version=entry_version, + ttl=ttl, + tags=tags or [], + ) + self.entries[key] = entry + self.version += 1 + self.updated_at = datetime.now().isoformat() + return entry + + # ------------------------------------------------------------------ + # Read + # ------------------------------------------------------------------ + + def get(self, key: str) -> Optional[Any]: + """Return the value for *key*, or None if missing or TTL-expired.""" + entry = self.entries.get(key) + if entry is None: + return None + if entry.is_expired(): + del self.entries[key] + return None + return entry.value + + def get_entry(self, key: str) -> Optional[ContextEntry]: + """Return the full ContextEntry for *key*, or None if missing/expired.""" + entry = self.entries.get(key) + if entry is None: + return None + if entry.is_expired(): + del self.entries[key] + return None + return entry + + # ------------------------------------------------------------------ + # Delete + # ------------------------------------------------------------------ + + def delete(self, key: str, agent_id: str) -> bool: + """Remove *key* from the store. Returns True if it existed.""" + if key in self.entries: + del self.entries[key] + self.version += 1 + self.updated_at = datetime.now().isoformat() + return True + return False + + # ------------------------------------------------------------------ + # Iteration helpers + # ------------------------------------------------------------------ + + def keys(self) -> List[str]: + """Return all non-expired keys.""" + self.cleanup_expired() + return list(self.entries.keys()) + + def items(self) -> List[tuple]: + """Return all non-expired (key, value) pairs.""" + self.cleanup_expired() + return [(k, e.value) for k, e in self.entries.items()] + + # ------------------------------------------------------------------ + # Snapshot / merge + # ------------------------------------------------------------------ + + def snapshot(self) -> Dict[str, Any]: + """Return a deep copy of the current (non-expired) entries as a dict.""" + self.cleanup_expired() + return copy.deepcopy({k: e.to_dict() for k, e in self.entries.items()}) + + def merge(self, other: "SharedContext") -> None: + """ + Merge *other* into this context using last-write-wins by entry version. + Only entries with a higher version number replace existing ones. + """ + for key, other_entry in other.entries.items(): + existing = self.entries.get(key) + if existing is None or other_entry.version > existing.version: + self.entries[key] = copy.deepcopy(other_entry) + self.version = max(self.version, other.version) + self.updated_at = datetime.now().isoformat() + + # ------------------------------------------------------------------ + # Maintenance + # ------------------------------------------------------------------ + + def cleanup_expired(self) -> List[str]: + """Remove all TTL-expired entries. Returns the list of removed keys.""" + expired = [k for k, e in self.entries.items() if e.is_expired()] + for k in expired: + del self.entries[k] + if expired: + self.updated_at = datetime.now().isoformat() + return expired + + # ------------------------------------------------------------------ + # Serialisation + # ------------------------------------------------------------------ + + def to_dict(self) -> Dict[str, Any]: + return { + "namespace": self.namespace, + "entries": {k: e.to_dict() for k, e in self.entries.items()}, + "version": self.version, + "created_at": self.created_at, + "updated_at": self.updated_at, + } + + def to_json(self, indent: int = 2) -> str: + return json.dumps(self.to_dict(), indent=indent, ensure_ascii=False) + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "SharedContext": + ctx = cls( + namespace=data["namespace"], + version=data.get("version", 0), + created_at=data.get("created_at", datetime.now().isoformat()), + updated_at=data.get("updated_at", datetime.now().isoformat()), + ) + for key, entry_data in data.get("entries", {}).items(): + ctx.entries[key] = ContextEntry.from_dict(entry_data) + return ctx + + @classmethod + def from_json(cls, json_str: str) -> "SharedContext": + return cls.from_dict(json.loads(json_str)) diff --git a/Agent_module/SharedContext_Demo.python b/Agent_module/SharedContext_Demo.python new file mode 100644 index 0000000000..484edc1ea3 --- /dev/null +++ b/Agent_module/SharedContext_Demo.python @@ -0,0 +1,217 @@ +""" +SharedContext_Demo.python +========================= +End-to-end demonstration of multi-agent shared memory. + +Scenario +-------- + coordinator — assigns a task, sets status to "pending" + worker — picks up the task, processes it, writes result back + monitor — passive observer; callback fires on every change + +Run: + python3 Agent_module/SharedContext_Demo.python +""" + +import sys, os, json, time, importlib.util as _ilu + +def _load(name: str): + from importlib.machinery import SourceFileLoader + here = os.path.dirname(os.path.abspath(__file__)) + path = os.path.join(here, f"{name}.python") + loader = SourceFileLoader(name, path) + spec = _ilu.spec_from_loader(name, loader) + mod = _ilu.module_from_spec(spec) + sys.modules[name] = mod + loader.exec_module(mod) + return mod + +_sc = _load("SharedContext") +_cb = _load("ContextBridge") + +SharedContext = _sc.SharedContext +ContextEntry = _sc.ContextEntry +ContextBridge = _cb.ContextBridge + +# ============================================================================ +# Helper: pretty section headers +# ============================================================================ + +def section(title: str) -> None: + print("\n" + "=" * 70) + print(f" {title}") + print("=" * 70) + + +# ============================================================================ +# Monitor callback — fires whenever a subscribed key changes +# ============================================================================ + +_change_log: list = [] # record all changes for summary at the end + +def monitor_callback(key: str, entry: ContextEntry) -> None: + msg = ( + f"[MONITOR] key={key!r:20s} " + f"agent={entry.agent_id:15s} " + f"value={str(entry.value)!r:.60s}" + ) + print(msg) + _change_log.append({"key": key, "agent_id": entry.agent_id, "value": entry.value}) + + +# ============================================================================ +# Main demo +# ============================================================================ + +def main() -> None: + + # ------------------------------------------------------------------ + # 1. Create SharedContext and wrap in ContextBridge + # ------------------------------------------------------------------ + section("1. Setup — SharedContext + ContextBridge") + + ctx = SharedContext(namespace="data_pipeline_v1") + bridge = ContextBridge(ctx) + + # ------------------------------------------------------------------ + # 2. Register agents + # ------------------------------------------------------------------ + section("2. Agent registration") + + # Monitor subscribes to ALL keys (empty keys list) + bridge.register_agent("monitor", monitor_callback, keys=[]) + + # Coordinator and worker subscribe to keys they care about, + # but we skip giving them callbacks in this demo (they act, not react). + bridge.register_agent("coordinator", lambda k, e: None, keys=["result"]) + bridge.register_agent("worker", lambda k, e: None, keys=["task", "status"]) + + print(" Registered: coordinator, worker, monitor") + + # ------------------------------------------------------------------ + # 3. Coordinator writes task + initial status + # ------------------------------------------------------------------ + section("3. Coordinator assigns task") + + bridge.write( + agent_id="coordinator", + key="task", + value={ + "id": "task_001", + "description": "Analyse sales data for Q1", + "priority": "high", + }, + ) + bridge.write(agent_id="coordinator", key="status", value="pending") + bridge.write( + agent_id="coordinator", + key="deadline", + value="2026-02-18T18:00:00", + tags=["meta"], + ) + + # ------------------------------------------------------------------ + # 4. Worker reads task, updates status, writes result + # ------------------------------------------------------------------ + section("4. Worker processes task") + + task = bridge.read("worker", "task") + print(f" Worker received task: {task}") + + bridge.write(agent_id="worker", key="status", value="in_progress") + + # Simulate processing + print(" Worker processing …") + time.sleep(0.05) + + bridge.write( + agent_id="worker", + key="result", + value={ + "task_id": task["id"], + "summary": "Q1 revenue up 12 % YoY; top SKU: widget-X", + "rows_processed": 14_320, + }, + ) + bridge.write(agent_id="worker", key="status", value="completed") + + # ------------------------------------------------------------------ + # 5. Short-lived TTL entry (expires in 2 seconds) + # ------------------------------------------------------------------ + section("5. TTL demo — heartbeat entry (TTL = 2 s)") + + bridge.write( + agent_id="worker", + key="heartbeat", + value={"alive": True, "load": 0.42}, + ttl=2, + tags=["health"], + ) + print(" heartbeat written (TTL=2s)") + print(f" Immediate read → {bridge.read('coordinator', 'heartbeat')}") + + print(" Sleeping 3 s …") + time.sleep(3) + + expired_val = bridge.read("coordinator", "heartbeat") + print(f" After 3 s read → {expired_val} (None = expired correctly)") + + removed = ctx.cleanup_expired() + print(f" cleanup_expired() removed: {removed}") + + # ------------------------------------------------------------------ + # 6. Atomic broadcast + # ------------------------------------------------------------------ + section("6. Atomic broadcast by coordinator") + + bridge.broadcast( + agent_id="coordinator", + data={ + "audit_approved": True, + "reviewer": "agent_audit_bot", + }, + tags=["audit"], + ) + + # ------------------------------------------------------------------ + # 7. Full context snapshot + # ------------------------------------------------------------------ + section("7. Full context snapshot (read_all)") + + snapshot = bridge.read_all("monitor") + print(json.dumps(snapshot, indent=2, ensure_ascii=False)) + + # ------------------------------------------------------------------ + # 8. Merge demo + # ------------------------------------------------------------------ + section("8. Merge — remote snapshot into local context") + + remote_ctx = SharedContext(namespace="data_pipeline_v1") + remote_ctx.set("status", "archived", "archiver") + remote_ctx.set("archived_at", "2026-02-18T19:00:00", "archiver") + + ctx.merge(remote_ctx) + print(" Merged remote context (status overridden by higher version).") + print(f" status after merge → {ctx.get('status')!r}") + + # ------------------------------------------------------------------ + # 9. Export final context to JSON + # ------------------------------------------------------------------ + section("9. Final context — to_json() export") + + print(ctx.to_json()) + + # ------------------------------------------------------------------ + # 10. Monitor change summary + # ------------------------------------------------------------------ + section("10. Monitor change log summary") + + print(f" Total changes observed by monitor: {len(_change_log)}") + for i, entry in enumerate(_change_log, 1): + print(f" {i:2d}. [{entry['agent_id']:15s}] {entry['key']}") + + section("Demo complete") + + +if __name__ == "__main__": + main()
