Leomrlin commented on code in PR #737:
URL: https://github.com/apache/geaflow/pull/737#discussion_r2787478810


##########
geaflow-ai/src/operator/casts/casts/data/__init__.py:
##########


Review Comment:
   It is recommended to add a README in the root directory that briefly 
explains the origin of the CASTS module name, design goals, functional division 
of major components, code execution examples, or a demo, etc.



##########
geaflow-ai/src/operator/casts/casts/core/schema.py:
##########
@@ -0,0 +1,129 @@
+"""Graph schema implementation for CASTS system.
+
+This module provides concrete schema implementations that decouple
+graph structure metadata from execution logic.
+"""
+
+from enum import Enum
+from typing import Any
+
+from casts.core.interfaces import GraphSchema
+
+
+class SchemaState(str, Enum):
+    """Lifecycle state for schema extraction and validation."""
+
+    DIRTY = "dirty"
+    READY = "ready"
+
+
+class InMemoryGraphSchema(GraphSchema):
+    """In-memory implementation of GraphSchema for CASTS data sources."""
+
+    def __init__(
+        self, nodes: dict[str, dict[str, Any]], edges: dict[str, 
list[dict[str, str]]]
+    ):
+        """Initialize schema from graph data.
+
+        Args:
+            nodes: Dictionary of node_id -> node_properties
+            edges: Dictionary of source_node_id -> list of edge dicts
+        """
+        self._nodes = nodes
+        self._edges = edges
+        self._state = SchemaState.DIRTY
+        self._reset_cache()
+        self.rebuild()
+
+    def mark_dirty(self) -> None:
+        """Mark schema as dirty when underlying graph data changes."""
+        self._state = SchemaState.DIRTY
+
+    def rebuild(self) -> None:
+        """Rebuild schema caches from the current graph data."""
+        self._reset_cache()
+        self._extract_schema()
+        self._state = SchemaState.READY
+
+    def _ensure_ready(self) -> None:
+        """Ensure schema caches are initialized before read operations."""
+        if self._state == SchemaState.DIRTY:
+            self.rebuild()
+
+    def _reset_cache(self) -> None:
+        """Reset cached schema data structures."""
+        self._node_types: set[str] = set()
+        self._edge_labels: set[str] = set()
+        self._node_type_schemas: dict[str, dict[str, Any]] = {}
+        self._node_edge_labels: dict[str, list[str]] = {}
+        self._node_incoming_edge_labels: dict[str, list[str]] = {}
+
+    def _extract_schema(self) -> None:
+        """Extract schema information from graph data."""
+        for node_id in self._nodes:
+            self._node_incoming_edge_labels[node_id] = []
+
+        for source_id, out_edges in self._edges.items():
+            if source_id in self._nodes:
+                out_labels = sorted({edge["label"] for edge in out_edges})

Review Comment:
   Key constants should be declared as constants, including the target 
parameter below.



##########
geaflow-ai/src/operator/casts/casts/core/interfaces.py:
##########
@@ -0,0 +1,195 @@
+"""Core interfaces and abstractions for CASTS system.
+
+This module defines the key abstractions that enable dependency injection
+and adherence to SOLID principles, especially Dependency Inversion Principle 
(DIP).
+"""
+
+from abc import ABC, abstractmethod
+from typing import Any, Protocol
+
+import numpy as np
+
+
+class GoalGenerator(ABC):
+    """Abstract interface for generating traversal goals based on graph 
schema."""
+
+    @property
+    @abstractmethod
+    def goal_texts(self) -> list[str]:
+        """Get list of available goal descriptions."""
+        pass
+
+    @property
+    @abstractmethod
+    def goal_weights(self) -> list[int]:
+        """Get weights for goal selection (higher = more frequent)."""
+        pass
+
+    @abstractmethod
+    def select_goal(self, node_type: str | None = None) -> tuple[str, str]:
+        """Select a goal based on weights and optional node type context.
+
+        Returns:
+            Tuple of (goal_text, evaluation_rubric)
+        """
+        pass
+
+
+class GraphSchema(ABC):
+    """Abstract interface for graph schema describing structural 
constraints."""
+
+    @property
+    @abstractmethod
+    def node_types(self) -> set[str]:
+        """Get all node types in the graph."""
+        pass
+
+    @property
+    @abstractmethod
+    def edge_labels(self) -> set[str]:
+        """Get all edge labels in the graph."""
+        pass
+
+    @abstractmethod
+    def get_node_schema(self, node_type: str) -> dict[str, Any]:
+        """Get schema information for a specific node type."""
+        pass
+
+    @abstractmethod
+    def get_valid_outgoing_edge_labels(self, node_id: str) -> list[str]:

Review Comment:
   In the interface for obtaining outgoing edge types, should we pass in the 
vertex label instead of the specific entity ID to confine the computation to 
the metadata level? Passing an ID would involve the process of fetching the 
entity → obtaining the label → computing the types of adjacent edges.



##########
geaflow-ai/src/operator/casts/casts/core/__init__.py:
##########


Review Comment:
   All files in the project must start with an Apache License header, even if 
they are blank.



##########
geaflow-ai/src/operator/casts/casts/core/strategy_cache.py:
##########
@@ -0,0 +1,205 @@
+"""Core strategy cache service for storing and retrieving traversal 
strategies."""
+
+import re
+from typing import Any, Literal
+
+from casts.core.models import Context, StrategyKnowledgeUnit
+from casts.utils.helpers import (
+    calculate_dynamic_similarity_threshold,
+    calculate_tier2_threshold,
+    cosine_similarity,
+)
+
+MatchType = Literal["Tier1", "Tier2", ""]
+
+
+class StrategyCache:
+    """CASTS Strategy Cache for storing and matching traversal strategies 
(SKUs).
+
+    Implements the two-tier matching system described in 数学建模.md Section 4:
+        - Tier 1 (Strict Logic): Exact structural + goal match with predicate 
Φ(p)
+        - Tier 2 (Similarity): Embedding-based fallback with adaptive threshold
+
+    Mathematical model alignment:
+        - Tier 1 candidates: C_strict(c) where η ≥ η_min
+        - Tier 2 candidates: C_sim(c) where η ≥ η_tier2(η_min) = γ · η_min
+        - Similarity threshold: δ_sim(v) = 1 - κ / (σ_logic · (1 + β · log(η)))
+
+    Hyperparameters (configurable for experiments):
+        - min_confidence_threshold (η_min): Tier 1 baseline confidence
+        - tier2_gamma (γ): Tier 2 confidence scaling factor (γ > 1)
+        - similarity_kappa (κ): Base threshold sensitivity
+        - similarity_beta (β): Frequency sensitivity (热度敏感性)
+
+    Note: Higher η (confidence) → higher δ_sim → stricter matching requirement
+    """
+
+    def __init__(self, embed_service: Any, config: Any):
+        self.knowledge_base: list[StrategyKnowledgeUnit] = []
+        self.embed_service = embed_service
+
+        # Get all hyperparameters from the configuration object
+        # Default values balance exploration and safety (see config.py for 
detailed rationale)
+        # Note: Higher κ → lower threshold → more permissive 
(counter-intuitive!)
+        self.min_confidence_threshold = 
config.get_float("CACHE_MIN_CONFIDENCE_THRESHOLD")
+        self.current_schema_fingerprint = 
config.get_str("CACHE_SCHEMA_FINGERPRINT")
+        self.similarity_kappa = config.get_float("CACHE_SIMILARITY_KAPPA")
+        self.similarity_beta = config.get_float("CACHE_SIMILARITY_BETA")
+        self.tier2_gamma = config.get_float("CACHE_TIER2_GAMMA")
+        self.signature_level = config.get_int("SIGNATURE_LEVEL")
+        self.edge_whitelist = config.get("SIGNATURE_EDGE_WHITELIST")
+
+    async def find_strategy(
+        self,
+        context: Context,
+        skip_tier1: bool = False,
+    ) -> tuple[str | None, StrategyKnowledgeUnit | None, MatchType]:
+        """
+        Find a matching strategy for the given context.
+
+        Returns:
+            Tuple of (decision_template, strategy_knowledge_unit, match_type)
+            match_type: 'Tier1', 'Tier2', or ''
+
+        Two-tier matching:
+        - Tier 1: Strict logic matching (exact structural signature, goal, 
schema, and predicate)
+        - Tier 2: Similarity-based fallback (vector similarity when Tier 1 
fails)
+        """
+        # Tier 1: Strict Logic Matching
+        tier1_candidates = []
+        if not skip_tier1:  # Can bypass Tier1 for testing
+            for sku in self.knowledge_base:
+                # Exact matching on structural signature, goal, and schema
+                if (
+                    self._signatures_match(context.structural_signature, 
sku.structural_signature)
+                    and sku.goal_template == context.goal
+                    and sku.schema_fingerprint == 
self.current_schema_fingerprint
+                ):
+                    # Predicate only uses safe properties (no identity fields)
+                    try:
+                        if sku.confidence_score >= 
self.min_confidence_threshold and sku.predicate(
+                            context.safe_properties
+                        ):
+                            tier1_candidates.append(sku)
+                    except (KeyError, TypeError, ValueError, AttributeError) 
as e:
+                        # Defensive: some predicates may error on missing 
fields
+                        print(f"[warn] Tier1 predicate error on SKU {sku.id}: 
{e}")
+                        continue
+
+        if tier1_candidates:
+            # Pick best by confidence score
+            best_sku = max(tier1_candidates, key=lambda x: x.confidence_score)
+            return best_sku.decision_template, best_sku, "Tier1"
+
+        # Tier 2: Similarity-based Fallback (only if Tier 1 fails)
+        tier2_candidates = []
+        # Vector embedding based on safe properties only
+        property_vector = await 
self.embed_service.embed_properties(context.safe_properties)
+        # Compute Tier 2 confidence threshold η_tier2(η_min)
+        tier2_confidence_threshold = calculate_tier2_threshold(
+            self.min_confidence_threshold, self.tier2_gamma
+        )
+
+        for sku in self.knowledge_base:
+            # Require exact match on structural signature, goal, and schema
+            if (
+                self._signatures_match(context.structural_signature, 
sku.structural_signature)
+                and sku.goal_template == context.goal
+                and sku.schema_fingerprint == self.current_schema_fingerprint
+            ):
+                if sku.confidence_score >= tier2_confidence_threshold:  # 
Higher bar for Tier 2
+                    similarity = cosine_similarity(property_vector, 
sku.property_vector)
+                    threshold = calculate_dynamic_similarity_threshold(
+                        sku, self.similarity_kappa, self.similarity_beta
+                    )
+                    print(
+                        f"[debug] SKU {sku.id} - similarity: {similarity:.4f}, 
"
+                        f"threshold: {threshold:.4f}"
+                    )
+                    if similarity >= threshold:
+                        tier2_candidates.append((sku, similarity))
+
+        if tier2_candidates:
+            # Rank by confidence score primarily
+            best_sku, similarity = max(tier2_candidates, key=lambda x: 
x[0].confidence_score)
+            return best_sku.decision_template, best_sku, "Tier2"
+
+        # Explicitly type-safe None return for all components
+        return None, None, ""
+
+    def _to_abstract_signature(self, signature: str) -> str:
+        """Convert a canonical Level-2 signature to the configured abstraction 
level."""
+        if self.signature_level == 2:
+            return signature
+
+        abstract_parts = []
+        steps = signature.split('.')
+        for i, step in enumerate(steps):
+            if i == 0:
+                abstract_parts.append(step)
+                continue
+
+            match = re.match(r"([a-zA-Z_][a-zA-Z0-9_]*)(\(.*\))?", step)
+            if not match:
+                abstract_parts.append(step)
+                continue
+
+            op = match.group(1)
+            params = match.group(2) or "()"
+
+            # Level 0: Abstract everything
+            if self.signature_level == 0:
+                if op in ["out", "in", "both", "outE", "inE", "bothE"]:
+                    base_op = op.replace("E", "").replace("V", "")
+                    abstract_parts.append(f"{base_op}()")
+                else:
+                    abstract_parts.append("filter()")
+                continue
+
+            # Level 1: Edge-aware
+            if self.signature_level == 1:
+                if op in ["out", "in", "both", "outE", "inE", "bothE"]:
+                    if self.edge_whitelist:
+                        label_match = re.search(r"\('([^']+)'\)", params)
+                        if label_match and label_match.group(1) in 
self.edge_whitelist:
+                            abstract_parts.append(step)
+                        else:
+                            base_op = op.replace("E", "").replace("V", "")
+                            abstract_parts.append(f"{base_op}()")
+                    else:
+                        abstract_parts.append(step)
+                else:
+                    abstract_parts.append("filter()")
+
+        return ".".join(abstract_parts)
+
+    def _signatures_match(self, runtime_sig: str, stored_sig: str) -> bool:
+        """Check if two canonical signatures match at the configured 
abstraction level."""
+        runtime_abstract = self._to_abstract_signature(runtime_sig)
+        stored_abstract = self._to_abstract_signature(stored_sig)
+        return runtime_abstract == stored_abstract
+
+    def add_sku(self, sku: StrategyKnowledgeUnit) -> None:
+        """Add a new Strategy Knowledge Unit to the cache."""
+        self.knowledge_base.append(sku)
+
+    def update_confidence(self, sku: StrategyKnowledgeUnit, success: bool) -> 
None:
+        """
+        Update confidence score using AIMD (Additive Increase, Multiplicative 
Decrease).
+
+        Args:
+            sku: The strategy knowledge unit to update
+            success: Whether the strategy execution was successful
+        """
+        if success:
+            # Additive increase
+            sku.confidence_score += 1.0
+        else:
+            # Multiplicative decrease (penalty)
+            sku.confidence_score *= 0.5
+            # Ensure confidence doesn't drop below minimum
+            sku.confidence_score = max(0.1, sku.confidence_score)
+
+    def cleanup_low_confidence_skus(self) -> None:
+        """Remove SKUs that have fallen below the minimum confidence 
threshold."""
+        self.knowledge_base = [sku for sku in self.knowledge_base if 
sku.confidence_score >= 0.1]

Review Comment:
   The hardcoded threshold of 0.1 is inconsistent with the 
`min_confidence_threshold` defined in `config.py`



##########
geaflow-ai/src/operator/casts/casts/core/config.py:
##########
@@ -0,0 +1,210 @@
+"""Configuration management for CASTS system.
+
+Provides a clean abstraction over configuration sources (environment variables,
+config files, etc.) to eliminate hard-coded values.
+"""
+
+import os
+from typing import Any, Literal
+
+from dotenv import load_dotenv
+
+from casts.core.interfaces import Configuration
+
+# Load environment variables from .env file
+load_dotenv()
+
+
+class DefaultConfiguration(Configuration):
+    """Default configuration with hardcoded values for CASTS.
+
+    All configuration values are defined as class attributes for easy 
modification.
+    This eliminates the need for .env files while keeping configuration 
centralized.
+    """
+
+    # ============================================
+    # EMBEDDING SERVICE CONFIGURATION
+    # ============================================
+    EMBEDDING_ENDPOINT = os.environ.get("EMBEDDING_ENDPOINT", "")
+    EMBEDDING_APIKEY = os.environ.get("EMBEDDING_APIKEY", 
"YOUR_EMBEDDING_API_KEY_HERE")

Review Comment:
   It's recommended to remove default values and provide explicit error 
messages when required values are empty.



##########
geaflow-ai/src/operator/casts/casts/core/gremlin_state.py:
##########
@@ -0,0 +1,265 @@
+"""Gremlin traversal state machine for validating graph traversal steps."""
+
+from dataclasses import dataclass
+from typing import Literal, Sequence, TypedDict
+
+from casts.core.interfaces import GraphSchema
+
+
+GremlinState = Literal["V", "E", "P", "END"]
+
+
+class GremlinStateDefinition(TypedDict):
+    """Typed representation of a Gremlin state definition."""
+
+    options: list[str]
+    transitions: dict[str, GremlinState]
+
+
+# Gremlin Step State Machine
+# Defines valid transitions between step types (V: Vertex, E: Edge, P: 
Property)
+GREMLIN_STEP_STATE_MACHINE: dict[GremlinState, GremlinStateDefinition] = {
+    # State: current element is a Vertex
+    "V": {
+        "options": [
+            "out('label')",
+            "in('label')",
+            "both('label')",
+            "outE('label')",
+            "inE('label')",
+            "bothE('label')",
+            "has('prop','value')",
+            "dedup()",
+            "simplePath()",
+            "order().by('prop')",
+            "limit(n)",
+            "values('prop')",
+            "stop",
+        ],
+        "transitions": {
+            "out": "V",
+            "in": "V",
+            "both": "V",
+            "outE": "E",
+            "inE": "E",
+            "bothE": "E",
+            "has": "V",
+            "dedup": "V",
+            "simplePath": "V",
+            "order": "V",
+            "limit": "V",
+            "values": "P",
+            "stop": "END",
+        },
+    },
+    # State: current element is an Edge
+    "E": {
+        "options": [
+            "inV()",
+            "outV()",
+            "otherV()",
+            "has('prop','value')",
+            "dedup()",
+            "simplePath()",
+            "order().by('prop')",
+            "limit(n)",
+            "values('prop')",
+            "stop",
+        ],
+        "transitions": {
+            "inV": "V",
+            "outV": "V",
+            "otherV": "V",
+            "has": "E",
+            "dedup": "E",
+            "simplePath": "E",
+            "order": "E",
+            "limit": "E",
+            "values": "P",
+            "stop": "END",
+        },
+    },
+    # State: current element is a Property/Value
+    "P": {
+        "options": ["order()", "limit(n)", "dedup()", "simplePath()", "stop"],
+        "transitions": {
+            "order": "P",
+            "limit": "P",
+            "dedup": "P",
+            "simplePath": "P",
+            "stop": "END",
+        },
+    },
+    "END": {"options": [], "transitions": {}},
+}
+
+_MODIFIER_STEPS = {"by"}
+_MODIFIER_COMPATIBILITY = {"by": {"order"}}
+
+
+@dataclass(frozen=True)
+class ParsedStep:
+    """Parsed step representation for traversal signatures."""
+
+    raw: str
+    name: str
+
+
+def _normalize_signature(signature: str) -> str:
+    """Normalize a traversal signature by stripping the V() prefix and 
separators."""
+    normalized = signature.strip()
+    if not normalized or normalized == "V()":
+        return ""
+
+    if normalized.startswith("V()"):
+        normalized = normalized[3:]
+    elif normalized.startswith("V"):
+        normalized = normalized[1:]
+
+    return normalized.lstrip(".")
+
+
+def _split_steps(signature: str) -> list[str]:
+    """Split a traversal signature into raw step segments."""
+    if not signature:
+        return []
+
+    steps: list[str] = []
+    current: list[str] = []
+    depth = 0
+
+    for ch in signature:
+        if ch == "." and depth == 0:
+            if current:
+                steps.append("".join(current))
+                current = []
+            continue
+
+        if ch == "(":
+            depth += 1
+        elif ch == ")":
+            depth = max(depth - 1, 0)
+
+        current.append(ch)
+
+    if current:
+        steps.append("".join(current))
+
+    return [step for step in steps if step]
+
+
+def _extract_step_name(step: str) -> str:
+    """Extract the primary step name from a step string."""
+    head = step.split("(", 1)[0]
+    if "." in head:
+        return head.split(".", 1)[0]
+    return head
+
+
+def _combine_modifiers(steps: Sequence[str]) -> list[str]:
+    """Combine modifier steps (e.g., order().by()) into a single step 
string."""
+    combined: list[str] = []
+    for step in steps:
+        step_name = _extract_step_name(step)
+        if step_name in _MODIFIER_STEPS and combined:
+            previous_name = _extract_step_name(combined[-1])
+            if previous_name in _MODIFIER_COMPATIBILITY.get(step_name, set()):
+                combined[-1] = f"{combined[-1]}.{step}"
+                continue
+        combined.append(step)
+    return combined
+
+
+def _parse_traversal_signature(signature: str) -> list[ParsedStep]:
+    """Parse traversal signature into steps with normalized names."""
+    normalized = _normalize_signature(signature)
+    raw_steps = _combine_modifiers(_split_steps(normalized))
+    return [ParsedStep(raw=step, name=_extract_step_name(step)) for step in 
raw_steps]
+
+
+class GremlinStateMachine:
+    """State machine for validating Gremlin traversal steps and determining 
next valid options."""
+
+    @staticmethod
+    def parse_traversal_signature(structural_signature: str) -> list[str]:
+        """Parse traversal signature into decision steps for display or 
history."""
+        return [step.raw for step in 
_parse_traversal_signature(structural_signature)]
+
+    @staticmethod
+    def get_state_and_options(
+        structural_signature: str, graph_schema: GraphSchema, node_id: str

Review Comment:
   Does `graph_schema` support dynamic modifications? The `graph_schema` is 
passed in each time the state machine is invoked, but there may be differences.



##########
geaflow-ai/src/operator/casts/casts/core/config.py:
##########
@@ -0,0 +1,210 @@
+"""Configuration management for CASTS system.

Review Comment:
   It's better not to mix with Java code; place CASTS in a dedicated folder, 
such as `geaflow-ai/casts`, and why does casts have two nested layers with the 
same name?



##########
geaflow-ai/src/operator/casts/casts/core/interfaces.py:
##########
@@ -0,0 +1,195 @@
+"""Core interfaces and abstractions for CASTS system.
+
+This module defines the key abstractions that enable dependency injection
+and adherence to SOLID principles, especially Dependency Inversion Principle 
(DIP).
+"""
+
+from abc import ABC, abstractmethod
+from typing import Any, Protocol
+
+import numpy as np
+
+
+class GoalGenerator(ABC):
+    """Abstract interface for generating traversal goals based on graph 
schema."""
+
+    @property
+    @abstractmethod
+    def goal_texts(self) -> list[str]:
+        """Get list of available goal descriptions."""
+        pass
+
+    @property
+    @abstractmethod
+    def goal_weights(self) -> list[int]:

Review Comment:
   Is it somewhat limiting to restrict weights to numeric types? Or, should 
they rather be float numbers?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to