This is an automated email from the ASF dual-hosted git repository.
chenliang613 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new f6c7656f83 Add shared context and pub/sub bridge for multi-agent
collaboration
f6c7656f83 is described below
commit f6c7656f839228ba7fe417b1bdb59c8bbf2d12f3
Author: chenliang <[email protected]>
AuthorDate: Wed Feb 18 11:02:22 2026 +0800
Add shared context and pub/sub bridge for multi-agent collaboration
Introduces SharedContext (namespace-scoped key/value store with TTL,
versioning, merge, and snapshot) and ContextBridge (pub/sub layer that
notifies subscribers on writes) so multiple agents can read, write, and
react to shared state. SharedContext_Demo shows coordinator, worker, and
monitor agents collaborating end-to-end.
Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
---
Agent_module/ContextBridge.python | 152 +++++++++++++++++++++++
Agent_module/SharedContext.python | 216 ++++++++++++++++++++++++++++++++
Agent_module/SharedContext_Demo.python | 217 +++++++++++++++++++++++++++++++++
3 files changed, 585 insertions(+)
diff --git a/Agent_module/ContextBridge.python
b/Agent_module/ContextBridge.python
new file mode 100644
index 0000000000..9cb595a427
--- /dev/null
+++ b/Agent_module/ContextBridge.python
@@ -0,0 +1,152 @@
+from dataclasses import dataclass, field
+from typing import Any, Callable, Dict, List, Optional
+from datetime import datetime
+
+# Load SharedContext from the same directory.
+# .python extension requires importlib to load by file path.
+import sys, os, importlib.util as _ilu
+
+def _load(name: str):
+ from importlib.machinery import SourceFileLoader
+ here = os.path.dirname(os.path.abspath(__file__))
+ path = os.path.join(here, f"{name}.python")
+ loader = SourceFileLoader(name, path)
+ spec = _ilu.spec_from_loader(name, loader)
+ mod = _ilu.module_from_spec(spec)
+ sys.modules[name] = mod
+ loader.exec_module(mod)
+ return mod
+
+_sc = _load("SharedContext")
+SharedContext = _sc.SharedContext
+ContextEntry = _sc.ContextEntry
+
+# ============================================================================
+# Pub/Sub Bridge
+# ============================================================================
+
+@dataclass
+class ContextSubscription:
+ """Subscription record: which agent watches which keys."""
+ agent_id: str
+ keys: List[str] # empty list = subscribe to ALL keys
+ callback: Callable # called as callback(key, entry) on change
+
+
+class ContextBridge:
+ """
+ Connects agents to a SharedContext and fires callbacks on writes.
+
+ Usage pattern:
+ bridge = ContextBridge(context)
+ bridge.register_agent("monitor", on_change) # subscribe to all
+ bridge.write("coordinator", "task", {"id": 1})
+ """
+
+ def __init__(self, context: SharedContext):
+ self._context = context
+ self._subscriptions: Dict[str, ContextSubscription] = {}
+ self._access_log: List[Dict[str, Any]] = []
+
+ # ------------------------------------------------------------------
+ # Agent registration
+ # ------------------------------------------------------------------
+
+ def register_agent(
+ self,
+ agent_id: str,
+ callback: Callable,
+ keys: Optional[List[str]] = None,
+ ) -> None:
+ """Subscribe *agent_id* to context changes on *keys* (all if empty)."""
+ self._subscriptions[agent_id] = ContextSubscription(
+ agent_id=agent_id,
+ keys=keys or [],
+ callback=callback,
+ )
+
+ def unregister_agent(self, agent_id: str) -> None:
+ """Remove the subscription for *agent_id*."""
+ self._subscriptions.pop(agent_id, None)
+
+ # ------------------------------------------------------------------
+ # Writes
+ # ------------------------------------------------------------------
+
+ def write(
+ self,
+ agent_id: str,
+ key: str,
+ value: Any,
+ ttl: Optional[int] = None,
+ tags: Optional[List[str]] = None,
+ ) -> ContextEntry:
+ """Write *key* → *value* on behalf of *agent_id*, then notify."""
+ entry = self._context.set(key, value, agent_id, ttl=ttl, tags=tags)
+ self._notify(key, entry)
+ return entry
+
+ def broadcast(
+ self,
+ agent_id: str,
+ data: Dict[str, Any],
+ ttl: Optional[int] = None,
+ tags: Optional[List[str]] = None,
+ ) -> List[ContextEntry]:
+ """Write multiple keys atomically (no interleaved notifications)."""
+ written: List[ContextEntry] = []
+ for key, value in data.items():
+ entry = self._context.set(key, value, agent_id, ttl=ttl, tags=tags)
+ written.append(entry)
+ # Notify after all entries are committed
+ for entry in written:
+ self._notify(entry.key, entry)
+ return written
+
+ # ------------------------------------------------------------------
+ # Reads
+ # ------------------------------------------------------------------
+
+ def read(self, agent_id: str, key: str) -> Optional[Any]:
+ """Read *key* on behalf of *agent_id*; access is logged."""
+ value = self._context.get(key)
+ self._log_access(agent_id, "read", key)
+ return value
+
+ def read_all(self, agent_id: str) -> Dict[str, Any]:
+ """Return a snapshot of all non-expired entries visible to
*agent_id*."""
+ self._log_access(agent_id, "read_all", "*")
+ return self._context.snapshot()
+
+ # ------------------------------------------------------------------
+ # Internal helpers
+ # ------------------------------------------------------------------
+
+ def _notify(self, changed_key: str, entry: ContextEntry) -> None:
+ """Fire callbacks for every subscriber that watches *changed_key*."""
+ for sub in self._subscriptions.values():
+ # Skip the writer itself (no self-notification)
+ if sub.agent_id == entry.agent_id:
+ continue
+ if sub.keys and changed_key not in sub.keys:
+ continue
+ try:
+ sub.callback(changed_key, entry)
+ except Exception:
+ pass # Don't let a bad callback break the bridge
+
+ def _log_access(self, agent_id: str, operation: str, key: str) -> None:
+ self._access_log.append({
+ "agent_id": agent_id,
+ "operation": operation,
+ "key": key,
+ "timestamp": datetime.now().isoformat(),
+ })
+
+ # ------------------------------------------------------------------
+ # Diagnostics
+ # ------------------------------------------------------------------
+
+ def access_log(self) -> List[Dict[str, Any]]:
+ """Return a copy of the access log."""
+ return list(self._access_log)
diff --git a/Agent_module/SharedContext.python
b/Agent_module/SharedContext.python
new file mode 100644
index 0000000000..e7eb96291f
--- /dev/null
+++ b/Agent_module/SharedContext.python
@@ -0,0 +1,216 @@
+from dataclasses import dataclass, field
+from typing import Any, Dict, List, Optional
+from datetime import datetime
+import copy
+import json
+
+# ============================================================================
+# Core Data Models — Shared Context
+# NOTE: SharedContext is NOT thread-safe by default. For concurrent access,
+# callers must provide external locking (e.g., threading.Lock).
+# ============================================================================
+
+@dataclass
+class ContextEntry:
+ """A single key/value entry written by an agent into the shared context."""
+ key: str
+ value: Any
+ agent_id: str
+ timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
+ version: int = 1
+ ttl: Optional[int] = None # seconds until expiry; None = forever
+ tags: List[str] = field(default_factory=list)
+
+ def is_expired(self) -> bool:
+ """Return True if this entry has passed its TTL."""
+ if self.ttl is None:
+ return False
+ written_at = datetime.fromisoformat(self.timestamp)
+ elapsed = (datetime.now() - written_at).total_seconds()
+ return elapsed > self.ttl
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ "key": self.key,
+ "value": self.value,
+ "agent_id": self.agent_id,
+ "timestamp": self.timestamp,
+ "version": self.version,
+ "ttl": self.ttl,
+ "tags": self.tags,
+ }
+
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> "ContextEntry":
+ return cls(
+ key=data["key"],
+ value=data["value"],
+ agent_id=data["agent_id"],
+ timestamp=data.get("timestamp", datetime.now().isoformat()),
+ version=data.get("version", 1),
+ ttl=data.get("ttl"),
+ tags=data.get("tags", []),
+ )
+
+
+@dataclass
+class SharedContext:
+ """
+ A namespace-scoped shared memory store for multiple agents.
+
+ Agents read and write ContextEntry objects keyed by string.
+ Version numbers enable last-write-wins conflict resolution during merge.
+
+ NOTE: Not thread-safe. Use external locking for concurrent access.
+ """
+ namespace: str
+ entries: Dict[str, ContextEntry] = field(default_factory=dict)
+ version: int = 0
+ created_at: str = field(default_factory=lambda: datetime.now().isoformat())
+ updated_at: str = field(default_factory=lambda: datetime.now().isoformat())
+
+ # ------------------------------------------------------------------
+ # Write
+ # ------------------------------------------------------------------
+
+ def set(
+ self,
+ key: str,
+ value: Any,
+ agent_id: str,
+ ttl: Optional[int] = None,
+ tags: Optional[List[str]] = None,
+ ) -> ContextEntry:
+ """Write a value. Bumps the store-level version counter."""
+ existing = self.entries.get(key)
+ entry_version = (existing.version + 1) if existing else 1
+
+ entry = ContextEntry(
+ key=key,
+ value=value,
+ agent_id=agent_id,
+ version=entry_version,
+ ttl=ttl,
+ tags=tags or [],
+ )
+ self.entries[key] = entry
+ self.version += 1
+ self.updated_at = datetime.now().isoformat()
+ return entry
+
+ # ------------------------------------------------------------------
+ # Read
+ # ------------------------------------------------------------------
+
+ def get(self, key: str) -> Optional[Any]:
+ """Return the value for *key*, or None if missing or TTL-expired."""
+ entry = self.entries.get(key)
+ if entry is None:
+ return None
+ if entry.is_expired():
+ del self.entries[key]
+ return None
+ return entry.value
+
+ def get_entry(self, key: str) -> Optional[ContextEntry]:
+ """Return the full ContextEntry for *key*, or None if
missing/expired."""
+ entry = self.entries.get(key)
+ if entry is None:
+ return None
+ if entry.is_expired():
+ del self.entries[key]
+ return None
+ return entry
+
+ # ------------------------------------------------------------------
+ # Delete
+ # ------------------------------------------------------------------
+
+ def delete(self, key: str, agent_id: str) -> bool:
+ """Remove *key* from the store. Returns True if it existed."""
+ if key in self.entries:
+ del self.entries[key]
+ self.version += 1
+ self.updated_at = datetime.now().isoformat()
+ return True
+ return False
+
+ # ------------------------------------------------------------------
+ # Iteration helpers
+ # ------------------------------------------------------------------
+
+ def keys(self) -> List[str]:
+ """Return all non-expired keys."""
+ self.cleanup_expired()
+ return list(self.entries.keys())
+
+ def items(self) -> List[tuple]:
+ """Return all non-expired (key, value) pairs."""
+ self.cleanup_expired()
+ return [(k, e.value) for k, e in self.entries.items()]
+
+ # ------------------------------------------------------------------
+ # Snapshot / merge
+ # ------------------------------------------------------------------
+
+ def snapshot(self) -> Dict[str, Any]:
+ """Return a deep copy of the current (non-expired) entries as a
dict."""
+ self.cleanup_expired()
+ return copy.deepcopy({k: e.to_dict() for k, e in self.entries.items()})
+
+ def merge(self, other: "SharedContext") -> None:
+ """
+ Merge *other* into this context using last-write-wins by entry version.
+ Only entries with a higher version number replace existing ones.
+ """
+ for key, other_entry in other.entries.items():
+ existing = self.entries.get(key)
+ if existing is None or other_entry.version > existing.version:
+ self.entries[key] = copy.deepcopy(other_entry)
+ self.version = max(self.version, other.version)
+ self.updated_at = datetime.now().isoformat()
+
+ # ------------------------------------------------------------------
+ # Maintenance
+ # ------------------------------------------------------------------
+
+ def cleanup_expired(self) -> List[str]:
+ """Remove all TTL-expired entries. Returns the list of removed
keys."""
+ expired = [k for k, e in self.entries.items() if e.is_expired()]
+ for k in expired:
+ del self.entries[k]
+ if expired:
+ self.updated_at = datetime.now().isoformat()
+ return expired
+
+ # ------------------------------------------------------------------
+ # Serialisation
+ # ------------------------------------------------------------------
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ "namespace": self.namespace,
+ "entries": {k: e.to_dict() for k, e in self.entries.items()},
+ "version": self.version,
+ "created_at": self.created_at,
+ "updated_at": self.updated_at,
+ }
+
+ def to_json(self, indent: int = 2) -> str:
+ return json.dumps(self.to_dict(), indent=indent, ensure_ascii=False)
+
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> "SharedContext":
+ ctx = cls(
+ namespace=data["namespace"],
+ version=data.get("version", 0),
+ created_at=data.get("created_at", datetime.now().isoformat()),
+ updated_at=data.get("updated_at", datetime.now().isoformat()),
+ )
+ for key, entry_data in data.get("entries", {}).items():
+ ctx.entries[key] = ContextEntry.from_dict(entry_data)
+ return ctx
+
+ @classmethod
+ def from_json(cls, json_str: str) -> "SharedContext":
+ return cls.from_dict(json.loads(json_str))
diff --git a/Agent_module/SharedContext_Demo.python
b/Agent_module/SharedContext_Demo.python
new file mode 100644
index 0000000000..484edc1ea3
--- /dev/null
+++ b/Agent_module/SharedContext_Demo.python
@@ -0,0 +1,217 @@
+"""
+SharedContext_Demo.python
+=========================
+End-to-end demonstration of multi-agent shared memory.
+
+Scenario
+--------
+ coordinator — assigns a task, sets status to "pending"
+ worker — picks up the task, processes it, writes result back
+ monitor — passive observer; callback fires on every change
+
+Run:
+ python3 Agent_module/SharedContext_Demo.python
+"""
+
+import sys, os, json, time, importlib.util as _ilu
+
+def _load(name: str):
+ from importlib.machinery import SourceFileLoader
+ here = os.path.dirname(os.path.abspath(__file__))
+ path = os.path.join(here, f"{name}.python")
+ loader = SourceFileLoader(name, path)
+ spec = _ilu.spec_from_loader(name, loader)
+ mod = _ilu.module_from_spec(spec)
+ sys.modules[name] = mod
+ loader.exec_module(mod)
+ return mod
+
+_sc = _load("SharedContext")
+_cb = _load("ContextBridge")
+
+SharedContext = _sc.SharedContext
+ContextEntry = _sc.ContextEntry
+ContextBridge = _cb.ContextBridge
+
+# ============================================================================
+# Helper: pretty section headers
+# ============================================================================
+
+def section(title: str) -> None:
+ print("\n" + "=" * 70)
+ print(f" {title}")
+ print("=" * 70)
+
+
+# ============================================================================
+# Monitor callback — fires whenever a subscribed key changes
+# ============================================================================
+
+_change_log: list = [] # record all changes for summary at the end
+
+def monitor_callback(key: str, entry: ContextEntry) -> None:
+ msg = (
+ f"[MONITOR] key={key!r:20s} "
+ f"agent={entry.agent_id:15s} "
+ f"value={str(entry.value)!r:.60s}"
+ )
+ print(msg)
+ _change_log.append({"key": key, "agent_id": entry.agent_id, "value":
entry.value})
+
+
+# ============================================================================
+# Main demo
+# ============================================================================
+
+def main() -> None:
+
+ # ------------------------------------------------------------------
+ # 1. Create SharedContext and wrap in ContextBridge
+ # ------------------------------------------------------------------
+ section("1. Setup — SharedContext + ContextBridge")
+
+ ctx = SharedContext(namespace="data_pipeline_v1")
+ bridge = ContextBridge(ctx)
+
+ # ------------------------------------------------------------------
+ # 2. Register agents
+ # ------------------------------------------------------------------
+ section("2. Agent registration")
+
+ # Monitor subscribes to ALL keys (empty keys list)
+ bridge.register_agent("monitor", monitor_callback, keys=[])
+
+ # Coordinator and worker subscribe to keys they care about,
+ # but we skip giving them callbacks in this demo (they act, not react).
+ bridge.register_agent("coordinator", lambda k, e: None, keys=["result"])
+ bridge.register_agent("worker", lambda k, e: None, keys=["task",
"status"])
+
+ print(" Registered: coordinator, worker, monitor")
+
+ # ------------------------------------------------------------------
+ # 3. Coordinator writes task + initial status
+ # ------------------------------------------------------------------
+ section("3. Coordinator assigns task")
+
+ bridge.write(
+ agent_id="coordinator",
+ key="task",
+ value={
+ "id": "task_001",
+ "description": "Analyse sales data for Q1",
+ "priority": "high",
+ },
+ )
+ bridge.write(agent_id="coordinator", key="status", value="pending")
+ bridge.write(
+ agent_id="coordinator",
+ key="deadline",
+ value="2026-02-18T18:00:00",
+ tags=["meta"],
+ )
+
+ # ------------------------------------------------------------------
+ # 4. Worker reads task, updates status, writes result
+ # ------------------------------------------------------------------
+ section("4. Worker processes task")
+
+ task = bridge.read("worker", "task")
+ print(f" Worker received task: {task}")
+
+ bridge.write(agent_id="worker", key="status", value="in_progress")
+
+ # Simulate processing
+ print(" Worker processing …")
+ time.sleep(0.05)
+
+ bridge.write(
+ agent_id="worker",
+ key="result",
+ value={
+ "task_id": task["id"],
+ "summary": "Q1 revenue up 12 % YoY; top SKU: widget-X",
+ "rows_processed": 14_320,
+ },
+ )
+ bridge.write(agent_id="worker", key="status", value="completed")
+
+ # ------------------------------------------------------------------
+ # 5. Short-lived TTL entry (expires in 2 seconds)
+ # ------------------------------------------------------------------
+ section("5. TTL demo — heartbeat entry (TTL = 2 s)")
+
+ bridge.write(
+ agent_id="worker",
+ key="heartbeat",
+ value={"alive": True, "load": 0.42},
+ ttl=2,
+ tags=["health"],
+ )
+ print(" heartbeat written (TTL=2s)")
+ print(f" Immediate read → {bridge.read('coordinator', 'heartbeat')}")
+
+ print(" Sleeping 3 s …")
+ time.sleep(3)
+
+ expired_val = bridge.read("coordinator", "heartbeat")
+ print(f" After 3 s read → {expired_val} (None = expired correctly)")
+
+ removed = ctx.cleanup_expired()
+ print(f" cleanup_expired() removed: {removed}")
+
+ # ------------------------------------------------------------------
+ # 6. Atomic broadcast
+ # ------------------------------------------------------------------
+ section("6. Atomic broadcast by coordinator")
+
+ bridge.broadcast(
+ agent_id="coordinator",
+ data={
+ "audit_approved": True,
+ "reviewer": "agent_audit_bot",
+ },
+ tags=["audit"],
+ )
+
+ # ------------------------------------------------------------------
+ # 7. Full context snapshot
+ # ------------------------------------------------------------------
+ section("7. Full context snapshot (read_all)")
+
+ snapshot = bridge.read_all("monitor")
+ print(json.dumps(snapshot, indent=2, ensure_ascii=False))
+
+ # ------------------------------------------------------------------
+ # 8. Merge demo
+ # ------------------------------------------------------------------
+ section("8. Merge — remote snapshot into local context")
+
+ remote_ctx = SharedContext(namespace="data_pipeline_v1")
+ remote_ctx.set("status", "archived", "archiver")
+ remote_ctx.set("archived_at", "2026-02-18T19:00:00", "archiver")
+
+ ctx.merge(remote_ctx)
+ print(" Merged remote context (status overridden by higher version).")
+ print(f" status after merge → {ctx.get('status')!r}")
+
+ # ------------------------------------------------------------------
+ # 9. Export final context to JSON
+ # ------------------------------------------------------------------
+ section("9. Final context — to_json() export")
+
+ print(ctx.to_json())
+
+ # ------------------------------------------------------------------
+ # 10. Monitor change summary
+ # ------------------------------------------------------------------
+ section("10. Monitor change log summary")
+
+ print(f" Total changes observed by monitor: {len(_change_log)}")
+ for i, entry in enumerate(_change_log, 1):
+ print(f" {i:2d}. [{entry['agent_id']:15s}] {entry['key']}")
+
+ section("Demo complete")
+
+
+if __name__ == "__main__":
+ main()