This is an automated email from the ASF dual-hosted git repository.

chenliang613 pushed a commit to branch feature/shared-context
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit f6c7656f839228ba7fe417b1bdb59c8bbf2d12f3
Author:  chenliang <[email protected]>
AuthorDate: Wed Feb 18 11:02:22 2026 +0800

    Add shared context and pub/sub bridge for multi-agent collaboration
    
    Introduces SharedContext (namespace-scoped key/value store with TTL,
    versioning, merge, and snapshot) and ContextBridge (pub/sub layer that
    notifies subscribers on writes) so multiple agents can read, write, and
    react to shared state. SharedContext_Demo shows coordinator, worker, and
    monitor agents collaborating end-to-end.
    
    Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
---
 Agent_module/ContextBridge.python      | 152 +++++++++++++++++++++++
 Agent_module/SharedContext.python      | 216 ++++++++++++++++++++++++++++++++
 Agent_module/SharedContext_Demo.python | 217 +++++++++++++++++++++++++++++++++
 3 files changed, 585 insertions(+)

diff --git a/Agent_module/ContextBridge.python 
b/Agent_module/ContextBridge.python
new file mode 100644
index 0000000000..9cb595a427
--- /dev/null
+++ b/Agent_module/ContextBridge.python
@@ -0,0 +1,152 @@
+from dataclasses import dataclass, field
+from typing import Any, Callable, Dict, List, Optional
+from datetime import datetime
+
+# Load SharedContext from the same directory.
+# .python extension requires importlib to load by file path.
+import sys, os, importlib.util as _ilu
+
+def _load(name: str):
+    from importlib.machinery import SourceFileLoader
+    here = os.path.dirname(os.path.abspath(__file__))
+    path = os.path.join(here, f"{name}.python")
+    loader = SourceFileLoader(name, path)
+    spec   = _ilu.spec_from_loader(name, loader)
+    mod    = _ilu.module_from_spec(spec)
+    sys.modules[name] = mod
+    loader.exec_module(mod)
+    return mod
+
+_sc = _load("SharedContext")
+SharedContext  = _sc.SharedContext
+ContextEntry   = _sc.ContextEntry
+
+# ============================================================================
+# Pub/Sub Bridge
+# ============================================================================
+
+@dataclass
+class ContextSubscription:
+    """Subscription record: which agent watches which keys."""
+    agent_id: str
+    keys: List[str]          # empty list = subscribe to ALL keys
+    callback: Callable       # called as callback(key, entry) on change
+
+
+class ContextBridge:
+    """
+    Connects agents to a SharedContext and fires callbacks on writes.
+
+    Usage pattern:
+        bridge = ContextBridge(context)
+        bridge.register_agent("monitor", on_change)   # subscribe to all
+        bridge.write("coordinator", "task", {"id": 1})
+    """
+
+    def __init__(self, context: SharedContext):
+        self._context = context
+        self._subscriptions: Dict[str, ContextSubscription] = {}
+        self._access_log: List[Dict[str, Any]] = []
+
+    # ------------------------------------------------------------------
+    # Agent registration
+    # ------------------------------------------------------------------
+
+    def register_agent(
+        self,
+        agent_id: str,
+        callback: Callable,
+        keys: Optional[List[str]] = None,
+    ) -> None:
+        """Subscribe *agent_id* to context changes on *keys* (all if empty)."""
+        self._subscriptions[agent_id] = ContextSubscription(
+            agent_id=agent_id,
+            keys=keys or [],
+            callback=callback,
+        )
+
+    def unregister_agent(self, agent_id: str) -> None:
+        """Remove the subscription for *agent_id*."""
+        self._subscriptions.pop(agent_id, None)
+
+    # ------------------------------------------------------------------
+    # Writes
+    # ------------------------------------------------------------------
+
+    def write(
+        self,
+        agent_id: str,
+        key: str,
+        value: Any,
+        ttl: Optional[int] = None,
+        tags: Optional[List[str]] = None,
+    ) -> ContextEntry:
+        """Write *key* → *value* on behalf of *agent_id*, then notify."""
+        entry = self._context.set(key, value, agent_id, ttl=ttl, tags=tags)
+        self._notify(key, entry)
+        return entry
+
+    def broadcast(
+        self,
+        agent_id: str,
+        data: Dict[str, Any],
+        ttl: Optional[int] = None,
+        tags: Optional[List[str]] = None,
+    ) -> List[ContextEntry]:
+        """Write multiple keys atomically (no interleaved notifications)."""
+        written: List[ContextEntry] = []
+        for key, value in data.items():
+            entry = self._context.set(key, value, agent_id, ttl=ttl, tags=tags)
+            written.append(entry)
+        # Notify after all entries are committed
+        for entry in written:
+            self._notify(entry.key, entry)
+        return written
+
+    # ------------------------------------------------------------------
+    # Reads
+    # ------------------------------------------------------------------
+
+    def read(self, agent_id: str, key: str) -> Optional[Any]:
+        """Read *key* on behalf of *agent_id*; access is logged."""
+        value = self._context.get(key)
+        self._log_access(agent_id, "read", key)
+        return value
+
+    def read_all(self, agent_id: str) -> Dict[str, Any]:
+        """Return a snapshot of all non-expired entries visible to 
*agent_id*."""
+        self._log_access(agent_id, "read_all", "*")
+        return self._context.snapshot()
+
+    # ------------------------------------------------------------------
+    # Internal helpers
+    # ------------------------------------------------------------------
+
+    def _notify(self, changed_key: str, entry: ContextEntry) -> None:
+        """Fire callbacks for every subscriber that watches *changed_key*."""
+        for sub in self._subscriptions.values():
+            # Skip the writer itself (no self-notification)
+            if sub.agent_id == entry.agent_id:
+                continue
+            if sub.keys and changed_key not in sub.keys:
+                continue
+            try:
+                sub.callback(changed_key, entry)
+            except Exception:
+                pass  # Don't let a bad callback break the bridge
+
+    def _log_access(self, agent_id: str, operation: str, key: str) -> None:
+        self._access_log.append({
+            "agent_id": agent_id,
+            "operation": operation,
+            "key": key,
+            "timestamp": datetime.now().isoformat(),
+        })
+
+    # ------------------------------------------------------------------
+    # Diagnostics
+    # ------------------------------------------------------------------
+
+    def access_log(self) -> List[Dict[str, Any]]:
+        """Return a copy of the access log."""
+        return list(self._access_log)
diff --git a/Agent_module/SharedContext.python 
b/Agent_module/SharedContext.python
new file mode 100644
index 0000000000..e7eb96291f
--- /dev/null
+++ b/Agent_module/SharedContext.python
@@ -0,0 +1,216 @@
+from dataclasses import dataclass, field
+from typing import Any, Dict, List, Optional
+from datetime import datetime
+import copy
+import json
+
+# ============================================================================
+# Core Data Models — Shared Context
+# NOTE: SharedContext is NOT thread-safe by default. For concurrent access,
+# callers must provide external locking (e.g., threading.Lock).
+# ============================================================================
+
+@dataclass
+class ContextEntry:
+    """A single key/value entry written by an agent into the shared context."""
+    key: str
+    value: Any
+    agent_id: str
+    timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
+    version: int = 1
+    ttl: Optional[int] = None        # seconds until expiry; None = forever
+    tags: List[str] = field(default_factory=list)
+
+    def is_expired(self) -> bool:
+        """Return True if this entry has passed its TTL."""
+        if self.ttl is None:
+            return False
+        written_at = datetime.fromisoformat(self.timestamp)
+        elapsed = (datetime.now() - written_at).total_seconds()
+        return elapsed > self.ttl
+
+    def to_dict(self) -> Dict[str, Any]:
+        return {
+            "key": self.key,
+            "value": self.value,
+            "agent_id": self.agent_id,
+            "timestamp": self.timestamp,
+            "version": self.version,
+            "ttl": self.ttl,
+            "tags": self.tags,
+        }
+
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> "ContextEntry":
+        return cls(
+            key=data["key"],
+            value=data["value"],
+            agent_id=data["agent_id"],
+            timestamp=data.get("timestamp", datetime.now().isoformat()),
+            version=data.get("version", 1),
+            ttl=data.get("ttl"),
+            tags=data.get("tags", []),
+        )
+
+
+@dataclass
+class SharedContext:
+    """
+    A namespace-scoped shared memory store for multiple agents.
+
+    Agents read and write ContextEntry objects keyed by string.
+    Version numbers enable last-write-wins conflict resolution during merge.
+
+    NOTE: Not thread-safe. Use external locking for concurrent access.
+    """
+    namespace: str
+    entries: Dict[str, ContextEntry] = field(default_factory=dict)
+    version: int = 0
+    created_at: str = field(default_factory=lambda: datetime.now().isoformat())
+    updated_at: str = field(default_factory=lambda: datetime.now().isoformat())
+
+    # ------------------------------------------------------------------
+    # Write
+    # ------------------------------------------------------------------
+
+    def set(
+        self,
+        key: str,
+        value: Any,
+        agent_id: str,
+        ttl: Optional[int] = None,
+        tags: Optional[List[str]] = None,
+    ) -> ContextEntry:
+        """Write a value.  Bumps the store-level version counter."""
+        existing = self.entries.get(key)
+        entry_version = (existing.version + 1) if existing else 1
+
+        entry = ContextEntry(
+            key=key,
+            value=value,
+            agent_id=agent_id,
+            version=entry_version,
+            ttl=ttl,
+            tags=tags or [],
+        )
+        self.entries[key] = entry
+        self.version += 1
+        self.updated_at = datetime.now().isoformat()
+        return entry
+
+    # ------------------------------------------------------------------
+    # Read
+    # ------------------------------------------------------------------
+
+    def get(self, key: str) -> Optional[Any]:
+        """Return the value for *key*, or None if missing or TTL-expired."""
+        entry = self.entries.get(key)
+        if entry is None:
+            return None
+        if entry.is_expired():
+            del self.entries[key]
+            return None
+        return entry.value
+
+    def get_entry(self, key: str) -> Optional[ContextEntry]:
+        """Return the full ContextEntry for *key*, or None if 
missing/expired."""
+        entry = self.entries.get(key)
+        if entry is None:
+            return None
+        if entry.is_expired():
+            del self.entries[key]
+            return None
+        return entry
+
+    # ------------------------------------------------------------------
+    # Delete
+    # ------------------------------------------------------------------
+
+    def delete(self, key: str, agent_id: str) -> bool:
+        """Remove *key* from the store.  Returns True if it existed."""
+        if key in self.entries:
+            del self.entries[key]
+            self.version += 1
+            self.updated_at = datetime.now().isoformat()
+            return True
+        return False
+
+    # ------------------------------------------------------------------
+    # Iteration helpers
+    # ------------------------------------------------------------------
+
+    def keys(self) -> List[str]:
+        """Return all non-expired keys."""
+        self.cleanup_expired()
+        return list(self.entries.keys())
+
+    def items(self) -> List[tuple]:
+        """Return all non-expired (key, value) pairs."""
+        self.cleanup_expired()
+        return [(k, e.value) for k, e in self.entries.items()]
+
+    # ------------------------------------------------------------------
+    # Snapshot / merge
+    # ------------------------------------------------------------------
+
+    def snapshot(self) -> Dict[str, Any]:
+        """Return a deep copy of the current (non-expired) entries as a 
dict."""
+        self.cleanup_expired()
+        return copy.deepcopy({k: e.to_dict() for k, e in self.entries.items()})
+
+    def merge(self, other: "SharedContext") -> None:
+        """
+        Merge *other* into this context using last-write-wins by entry version.
+        Only entries with a higher version number replace existing ones.
+        """
+        for key, other_entry in other.entries.items():
+            existing = self.entries.get(key)
+            if existing is None or other_entry.version > existing.version:
+                self.entries[key] = copy.deepcopy(other_entry)
+        self.version = max(self.version, other.version)
+        self.updated_at = datetime.now().isoformat()
+
+    # ------------------------------------------------------------------
+    # Maintenance
+    # ------------------------------------------------------------------
+
+    def cleanup_expired(self) -> List[str]:
+        """Remove all TTL-expired entries.  Returns the list of removed 
keys."""
+        expired = [k for k, e in self.entries.items() if e.is_expired()]
+        for k in expired:
+            del self.entries[k]
+        if expired:
+            self.updated_at = datetime.now().isoformat()
+        return expired
+
+    # ------------------------------------------------------------------
+    # Serialisation
+    # ------------------------------------------------------------------
+
+    def to_dict(self) -> Dict[str, Any]:
+        return {
+            "namespace": self.namespace,
+            "entries": {k: e.to_dict() for k, e in self.entries.items()},
+            "version": self.version,
+            "created_at": self.created_at,
+            "updated_at": self.updated_at,
+        }
+
+    def to_json(self, indent: int = 2) -> str:
+        return json.dumps(self.to_dict(), indent=indent, ensure_ascii=False)
+
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> "SharedContext":
+        ctx = cls(
+            namespace=data["namespace"],
+            version=data.get("version", 0),
+            created_at=data.get("created_at", datetime.now().isoformat()),
+            updated_at=data.get("updated_at", datetime.now().isoformat()),
+        )
+        for key, entry_data in data.get("entries", {}).items():
+            ctx.entries[key] = ContextEntry.from_dict(entry_data)
+        return ctx
+
+    @classmethod
+    def from_json(cls, json_str: str) -> "SharedContext":
+        return cls.from_dict(json.loads(json_str))
diff --git a/Agent_module/SharedContext_Demo.python 
b/Agent_module/SharedContext_Demo.python
new file mode 100644
index 0000000000..484edc1ea3
--- /dev/null
+++ b/Agent_module/SharedContext_Demo.python
@@ -0,0 +1,217 @@
+"""
+SharedContext_Demo.python
+=========================
+End-to-end demonstration of multi-agent shared memory.
+
+Scenario
+--------
+  coordinator  — assigns a task, sets status to "pending"
+  worker       — picks up the task, processes it, writes result back
+  monitor      — passive observer; callback fires on every change
+
+Run:
+    python3 Agent_module/SharedContext_Demo.python
+"""
+
+import sys, os, json, time, importlib.util as _ilu
+
+def _load(name: str):
+    from importlib.machinery import SourceFileLoader
+    here = os.path.dirname(os.path.abspath(__file__))
+    path = os.path.join(here, f"{name}.python")
+    loader = SourceFileLoader(name, path)
+    spec   = _ilu.spec_from_loader(name, loader)
+    mod    = _ilu.module_from_spec(spec)
+    sys.modules[name] = mod
+    loader.exec_module(mod)
+    return mod
+
+_sc  = _load("SharedContext")
+_cb  = _load("ContextBridge")
+
+SharedContext  = _sc.SharedContext
+ContextEntry   = _sc.ContextEntry
+ContextBridge  = _cb.ContextBridge
+
+# ============================================================================
+# Helper: pretty section headers
+# ============================================================================
+
+def section(title: str) -> None:
+    print("\n" + "=" * 70)
+    print(f"  {title}")
+    print("=" * 70)
+
+
+# ============================================================================
+# Monitor callback — fires whenever a subscribed key changes
+# ============================================================================
+
+_change_log: list = []   # record all changes for summary at the end
+
+def monitor_callback(key: str, entry: ContextEntry) -> None:
+    msg = (
+        f"[MONITOR] key={key!r:20s}  "
+        f"agent={entry.agent_id:15s}  "
+        f"value={str(entry.value)!r:.60s}"
+    )
+    print(msg)
+    _change_log.append({"key": key, "agent_id": entry.agent_id, "value": 
entry.value})
+
+
+# ============================================================================
+# Main demo
+# ============================================================================
+
+def main() -> None:
+
+    # ------------------------------------------------------------------
+    # 1. Create SharedContext and wrap in ContextBridge
+    # ------------------------------------------------------------------
+    section("1. Setup — SharedContext + ContextBridge")
+
+    ctx = SharedContext(namespace="data_pipeline_v1")
+    bridge = ContextBridge(ctx)
+
+    # ------------------------------------------------------------------
+    # 2. Register agents
+    # ------------------------------------------------------------------
+    section("2. Agent registration")
+
+    # Monitor subscribes to ALL keys (empty keys list)
+    bridge.register_agent("monitor", monitor_callback, keys=[])
+
+    # Coordinator and worker subscribe to keys they care about,
+    # but we skip giving them callbacks in this demo (they act, not react).
+    bridge.register_agent("coordinator", lambda k, e: None, keys=["result"])
+    bridge.register_agent("worker",      lambda k, e: None, keys=["task", 
"status"])
+
+    print("  Registered: coordinator, worker, monitor")
+
+    # ------------------------------------------------------------------
+    # 3. Coordinator writes task + initial status
+    # ------------------------------------------------------------------
+    section("3. Coordinator assigns task")
+
+    bridge.write(
+        agent_id="coordinator",
+        key="task",
+        value={
+            "id": "task_001",
+            "description": "Analyse sales data for Q1",
+            "priority": "high",
+        },
+    )
+    bridge.write(agent_id="coordinator", key="status", value="pending")
+    bridge.write(
+        agent_id="coordinator",
+        key="deadline",
+        value="2026-02-18T18:00:00",
+        tags=["meta"],
+    )
+
+    # ------------------------------------------------------------------
+    # 4. Worker reads task, updates status, writes result
+    # ------------------------------------------------------------------
+    section("4. Worker processes task")
+
+    task = bridge.read("worker", "task")
+    print(f"  Worker received task: {task}")
+
+    bridge.write(agent_id="worker", key="status", value="in_progress")
+
+    # Simulate processing
+    print("  Worker processing …")
+    time.sleep(0.05)
+
+    bridge.write(
+        agent_id="worker",
+        key="result",
+        value={
+            "task_id": task["id"],
+            "summary": "Q1 revenue up 12 % YoY; top SKU: widget-X",
+            "rows_processed": 14_320,
+        },
+    )
+    bridge.write(agent_id="worker", key="status", value="completed")
+
+    # ------------------------------------------------------------------
+    # 5. Short-lived TTL entry (expires in 2 seconds)
+    # ------------------------------------------------------------------
+    section("5. TTL demo — heartbeat entry (TTL = 2 s)")
+
+    bridge.write(
+        agent_id="worker",
+        key="heartbeat",
+        value={"alive": True, "load": 0.42},
+        ttl=2,
+        tags=["health"],
+    )
+    print("  heartbeat written (TTL=2s)")
+    print(f"  Immediate read  → {bridge.read('coordinator', 'heartbeat')}")
+
+    print("  Sleeping 3 s …")
+    time.sleep(3)
+
+    expired_val = bridge.read("coordinator", "heartbeat")
+    print(f"  After 3 s read  → {expired_val}  (None = expired correctly)")
+
+    removed = ctx.cleanup_expired()
+    print(f"  cleanup_expired() removed: {removed}")
+
+    # ------------------------------------------------------------------
+    # 6. Atomic broadcast
+    # ------------------------------------------------------------------
+    section("6. Atomic broadcast by coordinator")
+
+    bridge.broadcast(
+        agent_id="coordinator",
+        data={
+            "audit_approved": True,
+            "reviewer": "agent_audit_bot",
+        },
+        tags=["audit"],
+    )
+
+    # ------------------------------------------------------------------
+    # 7. Full context snapshot
+    # ------------------------------------------------------------------
+    section("7. Full context snapshot (read_all)")
+
+    snapshot = bridge.read_all("monitor")
+    print(json.dumps(snapshot, indent=2, ensure_ascii=False))
+
+    # ------------------------------------------------------------------
+    # 8. Merge demo
+    # ------------------------------------------------------------------
+    section("8. Merge — remote snapshot into local context")
+
+    remote_ctx = SharedContext(namespace="data_pipeline_v1")
+    remote_ctx.set("status",   "archived",  "archiver")
+    remote_ctx.set("archived_at", "2026-02-18T19:00:00", "archiver")
+
+    ctx.merge(remote_ctx)
+    print("  Merged remote context (status overridden by higher version).")
+    print(f"  status after merge → {ctx.get('status')!r}")
+
+    # ------------------------------------------------------------------
+    # 9. Export final context to JSON
+    # ------------------------------------------------------------------
+    section("9. Final context — to_json() export")
+
+    print(ctx.to_json())
+
+    # ------------------------------------------------------------------
+    # 10. Monitor change summary
+    # ------------------------------------------------------------------
+    section("10. Monitor change log summary")
+
+    print(f"  Total changes observed by monitor: {len(_change_log)}")
+    for i, entry in enumerate(_change_log, 1):
+        print(f"  {i:2d}. [{entry['agent_id']:15s}] {entry['key']}")
+
+    section("Demo complete")
+
+
+if __name__ == "__main__":
+    main()

Reply via email to