Leomrlin commented on code in PR #737:
URL: https://github.com/apache/geaflow/pull/737#discussion_r2791169238
##########
geaflow-ai/src/operator/casts/casts/data/graph_generator.py:
##########
@@ -0,0 +1,370 @@
+"""Graph data utilities for CASTS simulations.
+
+This module supports two data sources:
+
+1. Synthetic graph data with Zipf-like distribution (default).
+2. Real transaction/relationship data loaded from CSV files under
``real_graph_data/``.
+
+Use :class:`GraphGenerator` as the unified in-memory representation. The
simulation
+engine and other components should treat it as read-only.
+"""
+
+import csv
+from dataclasses import dataclass
+from pathlib import Path
+import random
+from typing import Any
+
+import networkx as nx
+
+
+@dataclass
+class GraphGeneratorConfig:
+ """Configuration for building graph data.
+
+ Attributes:
+ use_real_data: Whether to build from real CSV files instead of
synthetic data.
+ real_data_dir: Directory containing the ``*.csv`` relationship tables.
+ real_subgraph_size: Maximum number of nodes to keep when sampling a
+ connected subgraph from real data. If ``None``, use the full graph.
+ """
+
+ use_real_data: bool = False
+ real_data_dir: str | None = None
+ real_subgraph_size: int | None = None
+
+
+class GraphGenerator:
+ """Unified graph container used by the simulation.
+
+ - By default, it generates synthetic graph data with realistic business
+ entity relationships.
+ - When ``config.use_real_data`` is True, it instead loads nodes/edges from
+ ``real_graph_data`` CSV files and optionally samples a connected subgraph
+ to control size while preserving edge integrity.
+ """
+
+ def __init__(self, size: int = 30, config: GraphGeneratorConfig | None =
None):
+ self.nodes: dict[str, dict[str, Any]] = {}
+ self.edges: dict[str, list[dict[str, str]]] = {}
+
+ self.config = config or GraphGeneratorConfig()
+ self.source_label = "synthetic"
+
+ if self.config.use_real_data:
+ self._load_real_graph()
+ self.source_label = "real"
+ else:
+ self._generate_zipf_data(size)
+
+ def to_networkx(self) -> nx.DiGraph:
+ """Convert to NetworkX graph for visualization and analysis."""
+ G: nx.DiGraph = nx.DiGraph()
+ for node_id, node in self.nodes.items():
+ G.add_node(node_id, **node)
+ for node_id, edge_list in self.edges.items():
+ for edge in edge_list:
+ G.add_edge(node_id, edge['target'], label=edge['label'])
+ return G
+
+ # ------------------------------------------------------------------
+ # Synthetic data (existing behavior)
+ # ------------------------------------------------------------------
+
+ def _generate_zipf_data(self, size: int) -> None:
+ """Generate graph data following Zipf distribution for realistic
entity distributions."""
+ # Use concrete, realistic business roles instead of abstract types
+ # Approximate Zipf: "Retail SME" is most common, "FinTech Startup" is
rarest
+ business_types = [
+ "Retail SME", # Most common - small retail businesses
+ "Logistics Partner", # Medium frequency - logistics providers
+ "Enterprise Vendor", # Medium frequency - large vendors
+ "Regional Distributor", # Less common - regional distributors
+ "FinTech Startup", # Rarest - fintech companies
+ ]
+ # Weights approximating 1/k distribution
+ type_weights = [100, 50, 25, 12, 6]
+
+ business_categories = ["retail", "wholesale", "finance",
"manufacturing"]
+ regions = ["NA", "EU", "APAC", "LATAM"]
+ risk_levels = ["low", "medium", "high"]
+
+ # Generate nodes
+ for i in range(size):
+ node_type = random.choices(business_types, weights=type_weights,
k=1)[0]
+ status = "active" if random.random() < 0.8 else "inactive"
+ age = random.randint(18, 60)
+
+ node = {
+ "id": str(i),
+ "type": node_type,
+ "status": status,
+ "age": age,
+ "category": random.choice(business_categories),
+ "region": random.choice(regions),
+ "risk": random.choices(risk_levels, weights=[60, 30, 10])[0],
+ }
+ self.nodes[str(i)] = node
+ self.edges[str(i)] = []
+
+ # Generate edges with realistic relationship labels
+ edge_labels = ["related", "friend", "knows", "supplies", "manages"]
+ for i in range(size):
+ num_edges = random.randint(1, 4)
+ for _ in range(num_edges):
+ target = random.randint(0, size - 1)
+ if target != i:
+ label = random.choice(edge_labels)
+ # Ensure common "Retail SME" has more 'related' edges
+ # and "Logistics Partner" has more 'friend' edges for
interesting simulation
+ if self.nodes[str(i)]["type"] == "Retail SME" and
random.random() < 0.7:
+ label = "related"
+ elif (
+ self.nodes[str(i)]["type"] == "Logistics Partner"
+ and random.random() < 0.7
+ ):
+ label = "friend"
+
+ self.edges[str(i)].append({"target": str(target), "label":
label})
+
+ # ------------------------------------------------------------------
+ # Real data loading and subgraph sampling
+ # ------------------------------------------------------------------
+
+ def _load_real_graph(self) -> None:
Review Comment:
It might be better to place the specific implementation of `load_real_graph`
for real data externally, allowing users to add their own implementations
rather than hardcoding it into `graph_generator.py`.
##########
geaflow-ai/src/operator/casts/casts/data/sources.py:
##########
@@ -0,0 +1,942 @@
+"""Data source implementations for CASTS system.
+
+This module provides concrete implementations of the DataSource interface
+for both synthetic and real data sources.
+"""
+
+from collections import deque
+import csv
+from pathlib import Path
+import random
+from typing import Any
+
+import networkx as nx
+
+from casts.core.config import DefaultConfiguration
+from casts.core.interfaces import Configuration, DataSource, GoalGenerator,
GraphSchema
+from casts.core.schema import InMemoryGraphSchema
+
+
+class SyntheticBusinessGraphGoalGenerator(GoalGenerator):
+ """Goal generator for (Synthetic) business/financial graphs."""
+
+ def __init__(self):
+ # Emphasize multi-hop + relation types to give the LLM
+ # a clearer signal about traversable edges.
+ self._goals = [
+ (
+ "Map how risk propagates through multi-hop business "
+ "relationships (friend, supplier, partner, investor, "
+ "customer) based on available data",
+ "Score is based on the number of hops and the variety of
relationship types "
+ "(friend, supplier, partner, etc.) traversed. Paths that stay
within one "
+ "relationship type are less valuable.",
+ ),
+ (
+ "Discover natural community structures that emerge from "
+ "active entity interactions along friend and partner "
+ "relationships",
+ "Score is based on the density of connections found. Paths
that identify nodes "
+ "with many shared 'friend' or 'partner' links are more
valuable. Simple long "
+ "chains are less valuable.",
+ ),
+ (
+ "Recommend smarter supplier alternatives by walking "
+ "along supplier and customer chains and learning from "
+ "historical risk-category patterns",
+ "Score is based on ability to traverse 'supplier' and
'customer' chains. "
+ "The longer the chain, the better. Paths that don't follow
these "
+ "relationships should be penalized.",
+ ),
+ (
+ "Trace fraud signals across investor / partner / customer "
+ "relationship chains using real-time metrics, without "
+ "assuming globally optimal paths",
+ "Score is based on the length and complexity of chains
involving 'investor', "
+ "'partner', and 'customer' relationships. Paths that connect
disparate parts "
+ "of the graph are more valuable.",
+ ),
+ (
+ "Uncover hidden cross-region business connections through "
+ "accumulated domain knowledge and repeated traversals over "
+ "friend / partner edges",
+ "Score is based on the ability to connect nodes from different
'region' "
+ "properties using 'friend' or 'partner' edges. A path that
starts in 'NA' "
+ "and ends in 'EU' is high value.",
+ ),
+ ]
+ self._goal_weights = [100, 60, 40, 25, 15]
+
+ @property
+ def goal_texts(self) -> list[str]:
+ return [g[0] for g in self._goals]
+
+ @property
+ def goal_weights(self) -> list[int]:
+ return self._goal_weights.copy()
+
+ def select_goal(self, node_type: str | None = None) -> tuple[str, str]:
+ """Select a goal and its rubric based on weights."""
+ selected_goal, selected_rubric = random.choices(
+ self._goals, weights=self._goal_weights, k=1
+ )[0]
+ return selected_goal, selected_rubric
+
+
+class RealBusinessGraphGoalGenerator(GoalGenerator):
+ """Goal generator for real financial graph data.
+
+ Goals are written as QA-style descriptions over the actual
+ entity / relation types present in the CSV graph, so that
+ g explicitly reflects the observed schema.
+ """
+
+ def __init__(self, node_types: set[str], edge_labels: set[str]):
+ self._node_types = node_types
+ self._edge_labels = edge_labels
+
+ person = "Person" if "Person" in node_types else "person node"
+ company = "Company" if "Company" in node_types else "company node"
+ account = "Account" if "Account" in node_types else "account node"
+ loan = "Loan" if "Loan" in node_types else "loan node"
+
+ invest = "invest" if "invest" in edge_labels else "invest relation"
+ guarantee = (
+ "guarantee" if "guarantee" in edge_labels else "guarantee relation"
+ )
+ transfer = "transfer" if "transfer" in edge_labels else "transfer
relation"
+ withdraw = "withdraw" if "withdraw" in edge_labels else "withdraw
relation"
+ repay = "repay" if "repay" in edge_labels else "repay relation"
+ deposit = "deposit" if "deposit" in edge_labels else "deposit relation"
+ apply = "apply" if "apply" in edge_labels else "apply relation"
+ own = "own" if "own" in edge_labels else "ownership relation"
+
+ # Construct goals aligned to observable relations in the real graph.
+ self._goals = [
+ (
+ f"""Given a {person}, walk along {invest} / {own} /
{guarantee} / {apply} edges to reach related {company} or {loan} nodes and
return representative paths.""", # noqa: E501
+ f"""Score is based on whether a path connects a {person} to a
{company} or {loan}. Bonus for using multiple relation types and 2-4 hop paths.
Single-hop paths score lower.""", # noqa: E501
+ ),
+ (
+ f"""Starting from an {account}, follow {transfer} / {withdraw}
/ {repay} / {deposit} edges to trace money flows and reach a {loan} or another
{account} within 2-4 hops.""", # noqa: E501
+ f"""Score is based on staying on transaction edges and
reaching a {loan} or a multi-hop {account} chain. Paths that stop immediately
or use unrelated links score lower.""", # noqa: E501
+ ),
+ (
+ f"""For a single {company}, traverse {own} and {apply}
relations to reach both {account} and {loan} nodes, and include {guarantee} if
available.""", # noqa: E501
+ f"""Score is based on covering ownership and loan-related
steps in the same path. Higher scores for paths that include both {account} and
{loan} and use {guarantee}.""", # noqa: E501
+ ),
+ (
+ f"""Between {person} and {company} nodes, find short chains
using {invest} / {own} / {guarantee} relations to explain related-party
links.""", # noqa: E501
+ f"""Score is based on discovering paths that include both
{person} and {company} within 2-3 steps. Using more than one relation type
increases the score.""", # noqa: E501
+ ),
+ (
+ f"""From a {company}, explore multi-hop {invest} or
{guarantee} relations to reach multiple other {company} nodes and summarize the
cluster.""", # noqa: E501
+ f"""Score increases with the number of distinct {company}
nodes reached within 2-4 hops. Simple single-edge paths score lower.""", #
noqa: E501
+ ),
+ (
+ f"""Starting at a {loan}, follow incoming {repay} links to
{account} nodes, then use incoming {own} links to reach related {person} or
{company} owners.""", # noqa: E501
+ f"""Score is based on reaching at least one owner ({person} or
{company}) via {repay} -> {own} within 2-3 hops. Paths that end at {account}
score lower.""", # noqa: E501
+ ),
+ ]
+
+ # Heuristic weight distribution; can be tuned by future statistics
+ self._goal_weights = [100, 90, 80, 70, 60, 50]
+
+ @property
+ def goal_texts(self) -> list[str]:
+ return [g[0] for g in self._goals]
+
+ @property
+ def goal_weights(self) -> list[int]:
+ return self._goal_weights.copy()
+
+ def select_goal(self, node_type: str | None = None) -> tuple[str, str]:
+ """Weighted random selection; optionally bias by node_type.
+
+ If ``node_type`` is provided, slightly bias towards goals whose
+ text mentions that type; otherwise fall back to simple
+ weighted random sampling over all goals.
+ """
+
+ # Simple heuristic: filter a small candidate subset by node_type
+ candidates: list[tuple[str, str]] = self._goals
+ weights: list[int] = self._goal_weights
+
+ if node_type is not None:
+ node_type_lower = node_type.lower()
+ filtered: list[tuple[tuple[str, str], int]] = []
+
+ for goal_tuple, w in zip(self._goals, self._goal_weights,
strict=False):
+ text = goal_tuple[0]
+ if node_type_lower in text.lower():
+ # 同类型的目标权重放大一些
Review Comment:
Switch comments to English
##########
geaflow-ai/src/operator/casts/casts/data/sources.py:
##########
@@ -0,0 +1,942 @@
+"""Data source implementations for CASTS system.
+
+This module provides concrete implementations of the DataSource interface
+for both synthetic and real data sources.
+"""
+
+from collections import deque
+import csv
+from pathlib import Path
+import random
+from typing import Any
+
+import networkx as nx
+
+from casts.core.config import DefaultConfiguration
+from casts.core.interfaces import Configuration, DataSource, GoalGenerator,
GraphSchema
+from casts.core.schema import InMemoryGraphSchema
+
+
+class SyntheticBusinessGraphGoalGenerator(GoalGenerator):
+ """Goal generator for (Synthetic) business/financial graphs."""
+
+ def __init__(self):
+ # Emphasize multi-hop + relation types to give the LLM
+ # a clearer signal about traversable edges.
+ self._goals = [
+ (
+ "Map how risk propagates through multi-hop business "
+ "relationships (friend, supplier, partner, investor, "
+ "customer) based on available data",
+ "Score is based on the number of hops and the variety of
relationship types "
+ "(friend, supplier, partner, etc.) traversed. Paths that stay
within one "
+ "relationship type are less valuable.",
+ ),
+ (
+ "Discover natural community structures that emerge from "
+ "active entity interactions along friend and partner "
+ "relationships",
+ "Score is based on the density of connections found. Paths
that identify nodes "
+ "with many shared 'friend' or 'partner' links are more
valuable. Simple long "
+ "chains are less valuable.",
+ ),
+ (
+ "Recommend smarter supplier alternatives by walking "
+ "along supplier and customer chains and learning from "
+ "historical risk-category patterns",
+ "Score is based on ability to traverse 'supplier' and
'customer' chains. "
+ "The longer the chain, the better. Paths that don't follow
these "
+ "relationships should be penalized.",
+ ),
+ (
+ "Trace fraud signals across investor / partner / customer "
+ "relationship chains using real-time metrics, without "
+ "assuming globally optimal paths",
+ "Score is based on the length and complexity of chains
involving 'investor', "
+ "'partner', and 'customer' relationships. Paths that connect
disparate parts "
+ "of the graph are more valuable.",
+ ),
+ (
+ "Uncover hidden cross-region business connections through "
+ "accumulated domain knowledge and repeated traversals over "
+ "friend / partner edges",
+ "Score is based on the ability to connect nodes from different
'region' "
+ "properties using 'friend' or 'partner' edges. A path that
starts in 'NA' "
+ "and ends in 'EU' is high value.",
+ ),
+ ]
+ self._goal_weights = [100, 60, 40, 25, 15]
+
+ @property
+ def goal_texts(self) -> list[str]:
+ return [g[0] for g in self._goals]
+
+ @property
+ def goal_weights(self) -> list[int]:
+ return self._goal_weights.copy()
+
+ def select_goal(self, node_type: str | None = None) -> tuple[str, str]:
+ """Select a goal and its rubric based on weights."""
+ selected_goal, selected_rubric = random.choices(
+ self._goals, weights=self._goal_weights, k=1
+ )[0]
+ return selected_goal, selected_rubric
+
+
+class RealBusinessGraphGoalGenerator(GoalGenerator):
+ """Goal generator for real financial graph data.
+
+ Goals are written as QA-style descriptions over the actual
+ entity / relation types present in the CSV graph, so that
+ g explicitly reflects the observed schema.
+ """
+
+ def __init__(self, node_types: set[str], edge_labels: set[str]):
+ self._node_types = node_types
+ self._edge_labels = edge_labels
+
+ person = "Person" if "Person" in node_types else "person node"
+ company = "Company" if "Company" in node_types else "company node"
+ account = "Account" if "Account" in node_types else "account node"
+ loan = "Loan" if "Loan" in node_types else "loan node"
+
+ invest = "invest" if "invest" in edge_labels else "invest relation"
+ guarantee = (
+ "guarantee" if "guarantee" in edge_labels else "guarantee relation"
+ )
+ transfer = "transfer" if "transfer" in edge_labels else "transfer
relation"
+ withdraw = "withdraw" if "withdraw" in edge_labels else "withdraw
relation"
+ repay = "repay" if "repay" in edge_labels else "repay relation"
+ deposit = "deposit" if "deposit" in edge_labels else "deposit relation"
+ apply = "apply" if "apply" in edge_labels else "apply relation"
+ own = "own" if "own" in edge_labels else "ownership relation"
+
+ # Construct goals aligned to observable relations in the real graph.
+ self._goals = [
+ (
+ f"""Given a {person}, walk along {invest} / {own} /
{guarantee} / {apply} edges to reach related {company} or {loan} nodes and
return representative paths.""", # noqa: E501
+ f"""Score is based on whether a path connects a {person} to a
{company} or {loan}. Bonus for using multiple relation types and 2-4 hop paths.
Single-hop paths score lower.""", # noqa: E501
+ ),
+ (
+ f"""Starting from an {account}, follow {transfer} / {withdraw}
/ {repay} / {deposit} edges to trace money flows and reach a {loan} or another
{account} within 2-4 hops.""", # noqa: E501
+ f"""Score is based on staying on transaction edges and
reaching a {loan} or a multi-hop {account} chain. Paths that stop immediately
or use unrelated links score lower.""", # noqa: E501
+ ),
+ (
+ f"""For a single {company}, traverse {own} and {apply}
relations to reach both {account} and {loan} nodes, and include {guarantee} if
available.""", # noqa: E501
+ f"""Score is based on covering ownership and loan-related
steps in the same path. Higher scores for paths that include both {account} and
{loan} and use {guarantee}.""", # noqa: E501
+ ),
+ (
+ f"""Between {person} and {company} nodes, find short chains
using {invest} / {own} / {guarantee} relations to explain related-party
links.""", # noqa: E501
+ f"""Score is based on discovering paths that include both
{person} and {company} within 2-3 steps. Using more than one relation type
increases the score.""", # noqa: E501
+ ),
+ (
+ f"""From a {company}, explore multi-hop {invest} or
{guarantee} relations to reach multiple other {company} nodes and summarize the
cluster.""", # noqa: E501
+ f"""Score increases with the number of distinct {company}
nodes reached within 2-4 hops. Simple single-edge paths score lower.""", #
noqa: E501
+ ),
+ (
+ f"""Starting at a {loan}, follow incoming {repay} links to
{account} nodes, then use incoming {own} links to reach related {person} or
{company} owners.""", # noqa: E501
+ f"""Score is based on reaching at least one owner ({person} or
{company}) via {repay} -> {own} within 2-3 hops. Paths that end at {account}
score lower.""", # noqa: E501
+ ),
+ ]
+
+ # Heuristic weight distribution; can be tuned by future statistics
+ self._goal_weights = [100, 90, 80, 70, 60, 50]
+
+ @property
+ def goal_texts(self) -> list[str]:
+ return [g[0] for g in self._goals]
+
+ @property
+ def goal_weights(self) -> list[int]:
+ return self._goal_weights.copy()
+
+ def select_goal(self, node_type: str | None = None) -> tuple[str, str]:
+ """Weighted random selection; optionally bias by node_type.
+
+ If ``node_type`` is provided, slightly bias towards goals whose
+ text mentions that type; otherwise fall back to simple
+ weighted random sampling over all goals.
+ """
+
+ # Simple heuristic: filter a small candidate subset by node_type
+ candidates: list[tuple[str, str]] = self._goals
+ weights: list[int] = self._goal_weights
+
+ if node_type is not None:
+ node_type_lower = node_type.lower()
+ filtered: list[tuple[tuple[str, str], int]] = []
+
+ for goal_tuple, w in zip(self._goals, self._goal_weights,
strict=False):
+ text = goal_tuple[0]
+ if node_type_lower in text.lower():
+ # 同类型的目标权重放大一些
+ filtered.append((goal_tuple, w * 2))
+
+ if filtered:
+ c_tuple, w_tuple = zip(*filtered, strict=False)
+ candidates = list(c_tuple)
+ weights = list(w_tuple)
+
+ selected_goal, selected_rubric = random.choices(
+ candidates, weights=weights, k=1
+ )[0]
+ return selected_goal, selected_rubric
+
+
+class SyntheticDataSource(DataSource):
+ """Synthetic graph data source with Zipf distribution."""
+
+ def __init__(self, size: int = 30):
+ """Initialize synthetic data source.
+
+ Args:
+ size: Number of nodes to generate
+ """
+ self._nodes: dict[str, dict[str, Any]] = {}
+ self._edges: dict[str, list[dict[str, str]]] = {}
+ self._source_label = "synthetic"
+ # NOTE: For synthetic graphs we assume the generated data is immutable
+ # after initialization. If you mutate `nodes` / `edges` at runtime, you
+ # must call `get_schema()` again so a fresh InMemoryGraphSchema (and
+ # fingerprint) is built.
+ self._goal_generator: GoalGenerator | None = None
+ self._generate_zipf_data(size)
+ self._schema = InMemoryGraphSchema(self._nodes, self._edges)
+ self._goal_generator = SyntheticBusinessGraphGoalGenerator()
+
+ @property
+ def nodes(self) -> dict[str, dict[str, Any]]:
+ return self._nodes
+
+ @property
+ def edges(self) -> dict[str, list[dict[str, str]]]:
+ return self._edges
+
+ @property
+ def source_label(self) -> str:
+ return self._source_label
+
+ def get_node(self, node_id: str) -> dict[str, Any] | None:
+ return self._nodes.get(node_id)
+
+ def get_neighbors(self, node_id: str, edge_label: str | None = None) ->
list[str]:
+ """Get neighbor node IDs for a given node."""
+ if node_id not in self._edges:
+ return []
+
+ neighbors = []
+ for edge in self._edges[node_id]:
+ if edge_label is None or edge['label'] == edge_label:
+ neighbors.append(edge['target'])
+ return neighbors
+
+ def get_schema(self) -> GraphSchema:
+ """Get the graph schema for this data source."""
+ if self._schema is None:
+ self._schema = InMemoryGraphSchema(self._nodes, self._edges)
+ return self._schema
+
+ def get_goal_generator(self) -> GoalGenerator:
+ """Get the goal generator for this data source."""
+ if self._goal_generator is None:
+ self._goal_generator = SyntheticBusinessGraphGoalGenerator()
+ return self._goal_generator
+
+ def get_starting_nodes(
+ self,
+ goal: str,
+ recommended_node_types: list[str],
+ count: int,
+ min_degree: int = 2,
+ ) -> list[str]:
+ """Select starting nodes using LLM-recommended node types.
+
+ For synthetic data, this is straightforward because all nodes
+ are guaranteed to have at least 1 outgoing edge by construction.
+
+ Args:
+ goal: The traversal goal text (for logging)
+ recommended_node_types: Node types recommended by LLM
+ count: Number of starting nodes to return
+ min_degree: Minimum outgoing degree for fallback selection
+
+ Returns:
+ List of node IDs suitable for starting traversal
+ """
+ # Tier 1: LLM-recommended node types
+ if recommended_node_types:
+ candidates = [
+ node_id
+ for node_id, node in self._nodes.items()
+ if node.get("type") in recommended_node_types
+ ]
+
+ if len(candidates) >= count:
+ return random.sample(candidates, k=count)
+
+ # Tier 2: Degree-based fallback
+ candidates = [
+ node_id
+ for node_id in self._nodes.keys()
+ if len(self._edges.get(node_id, [])) >= min_degree
+ ]
+
+ if len(candidates) >= count:
+ return random.sample(candidates, k=count)
+
+ # Tier 3: Emergency fallback - any nodes with at least 1 edge
+ candidates = [
+ node_id for node_id in self._nodes.keys() if
len(self._edges.get(node_id, [])) >= 1
+ ]
+
+ if len(candidates) >= count:
+ return random.sample(candidates, k=count)
+
+ # Last resort: take any nodes
+ all_nodes = list(self._nodes.keys())
+ if len(all_nodes) >= count:
+ return random.sample(all_nodes, k=count)
+
+ return all_nodes
+
+ def _generate_zipf_data(self, size: int):
+ """Generate synthetic data following Zipf distribution."""
+ business_types = [
+ 'Retail SME',
+ 'Logistics Partner',
+ 'Enterprise Vendor',
+ 'Regional Distributor',
+ 'FinTech Startup',
+ ]
+ type_weights = [100, 50, 25, 12, 6]
+
+ business_categories = ['retail', 'wholesale', 'finance',
'manufacturing']
+ regions = ['NA', 'EU', 'APAC', 'LATAM']
+ risk_levels = ['low', 'medium', 'high']
+
+ # Generate nodes
+ for i in range(size):
+ node_type = random.choices(business_types, weights=type_weights,
k=1)[0]
+ status = 'active' if random.random() < 0.8 else 'inactive'
+ age = random.randint(18, 60)
+
+ node = {
+ 'id': str(i),
+ 'type': node_type,
+ 'category': random.choice(business_categories),
+ 'region': random.choice(regions),
+ 'risk': random.choice(risk_levels),
+ 'status': status,
+ 'age': age,
+ }
+ self._nodes[str(i)] = node
+
+ # Generate edges with more structured, denser relationship patterns
+ edge_labels = ['friend', 'supplier', 'partner', 'investor', 'customer']
+
+ # 基础随机度:保证每个点有一定随机边
+ for i in range(size):
+ base_degree = random.randint(1, 3) # 原来是 0~3,现在保证至少 1 条
+ for _ in range(base_degree):
+ target_id = str(random.randint(0, size - 1))
+ if target_id == str(i):
+ continue
+ label = random.choice(edge_labels)
+ edge = {'target': target_id, 'label': label}
+ self._edges.setdefault(str(i), []).append(edge)
+
+ # 结构性“偏好”:不同业务类型偏向某些关系,有利于 LLM 学习到稳定模板
Review Comment:
Same as above. Code comments should be in English, while bilingual
documentation (Chinese and English) can be provided separately.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]