Appointat commented on code in PR #737:
URL: https://github.com/apache/geaflow/pull/737#discussion_r2796568239


##########
geaflow-ai/src/operator/casts/casts/core/schema.py:
##########
@@ -0,0 +1,129 @@
+"""Graph schema implementation for CASTS system.
+
+This module provides concrete schema implementations that decouple
+graph structure metadata from execution logic.
+"""
+
+from enum import Enum
+from typing import Any
+
+from casts.core.interfaces import GraphSchema
+
+
+class SchemaState(str, Enum):
+    """Lifecycle state for schema extraction and validation."""
+
+    DIRTY = "dirty"
+    READY = "ready"
+
+
+class InMemoryGraphSchema(GraphSchema):
+    """In-memory implementation of GraphSchema for CASTS data sources."""
+
+    def __init__(
+        self, nodes: dict[str, dict[str, Any]], edges: dict[str, 
list[dict[str, str]]]
+    ):
+        """Initialize schema from graph data.
+
+        Args:
+            nodes: Dictionary of node_id -> node_properties
+            edges: Dictionary of source_node_id -> list of edge dicts
+        """
+        self._nodes = nodes
+        self._edges = edges
+        self._state = SchemaState.DIRTY
+        self._reset_cache()
+        self.rebuild()
+
+    def mark_dirty(self) -> None:
+        """Mark schema as dirty when underlying graph data changes."""
+        self._state = SchemaState.DIRTY
+
+    def rebuild(self) -> None:
+        """Rebuild schema caches from the current graph data."""
+        self._reset_cache()
+        self._extract_schema()
+        self._state = SchemaState.READY
+
+    def _ensure_ready(self) -> None:
+        """Ensure schema caches are initialized before read operations."""
+        if self._state == SchemaState.DIRTY:
+            self.rebuild()
+
+    def _reset_cache(self) -> None:
+        """Reset cached schema data structures."""
+        self._node_types: set[str] = set()
+        self._edge_labels: set[str] = set()
+        self._node_type_schemas: dict[str, dict[str, Any]] = {}
+        self._node_edge_labels: dict[str, list[str]] = {}
+        self._node_incoming_edge_labels: dict[str, list[str]] = {}
+
+    def _extract_schema(self) -> None:
+        """Extract schema information from graph data."""
+        for node_id in self._nodes:
+            self._node_incoming_edge_labels[node_id] = []
+
+        for source_id, out_edges in self._edges.items():
+            if source_id in self._nodes:
+                out_labels = sorted({edge["label"] for edge in out_edges})

Review Comment:
   Updated to use constants for edge keys (EDGE_LABEL_KEY / EDGE_TARGET_KEY) in 
the schema extraction loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to