This is an automated email from the ASF dual-hosted git repository.

yiconghuang pushed a commit to branch port-based-workflow-exp
in repository https://gitbox.apache.org/repos/asf/texera.git

commit d4896a054ca356209674c4a3a0abc612ac5cc00e
Author: Yicong Huang <[email protected]>
AuthorDate: Sun Mar 1 23:59:02 2026 -0800

    chore(python_compiling_service): add canonical good use case and clean 
module boundaries
---
 .gitignore                                         |  14 ++
 core/python_compiling_service/.gitignore           |   4 +
 core/python_compiling_service/docs/architecture.md |   7 +
 .../python_compiling_service/docs/good_use_case.md |  48 ++++
 .../examples/good_use_case.py                      |  33 +++
 .../src/compiler/README.md                         |   1 +
 .../src/compiler/__init__.py                       |   3 +
 .../src/compiler/use_cases.py                      |  21 ++
 core/python_compiling_service/src/port_detector.py | 260 +++------------------
 .../src/udf_compiling_service.py                   |  57 ++---
 .../tests/test_good_use_case.py                    |  30 +++
 .../tests/test_service_api.py                      |  11 +
 12 files changed, 215 insertions(+), 274 deletions(-)

diff --git a/.gitignore b/.gitignore
index 627c56f087..2230427d39 100644
--- a/.gitignore
+++ b/.gitignore
@@ -125,3 +125,17 @@ values-*.yaml    # Any environment-specific value overrides
 # Ignore nested node modules
 **/node_modules/
 **/package-lock.json
+
+# Local frontend build/cache artifacts (top-level frontend workspace)
+frontend/.angular/
+frontend/.nx/
+frontend/dist/
+frontend/dist-server/
+frontend/tmp/
+frontend/out-tsc/
+frontend/src/environments/version.ts
+frontend/.pnp.*
+frontend/.yarn/install-state.gz
+
+# Local amber debug artifact
+amber/physicalPlan.json
diff --git a/core/python_compiling_service/.gitignore 
b/core/python_compiling_service/.gitignore
new file mode 100644
index 0000000000..2e26d63d7e
--- /dev/null
+++ b/core/python_compiling_service/.gitignore
@@ -0,0 +1,4 @@
+.venv/
+.pytest_cache/
+__pycache__/
+*.py[cod]
diff --git a/core/python_compiling_service/docs/architecture.md 
b/core/python_compiling_service/docs/architecture.md
index 71ef84995a..17cedec0f7 100644
--- a/core/python_compiling_service/docs/architecture.md
+++ b/core/python_compiling_service/docs/architecture.md
@@ -73,3 +73,10 @@ flowchart LR
     B7 --> B8
     B7 --> B9
 ```
+
+## Recommended Use Case
+
+For a concrete compile input and expected output shape, use:
+
+- `core/python_compiling_service/docs/good_use_case.md`
+- `core/python_compiling_service/examples/good_use_case.py`
diff --git a/core/python_compiling_service/docs/good_use_case.md 
b/core/python_compiling_service/docs/good_use_case.md
new file mode 100644
index 0000000000..09168b9df9
--- /dev/null
+++ b/core/python_compiling_service/docs/good_use_case.md
@@ -0,0 +1,48 @@
+# Good Use Case: Auto-Cut UDF Compile
+
+This is the recommended path for production usage:
+
+1. Submit a pandas-style UDF with 2+ input tables.
+2. Do not add a `#<line>` directive.
+3. Let the compiler pick the best split point automatically.
+
+## Why this is the default
+
+- It keeps behavior stable across UDF edits.
+- It preserves compile flexibility (ranking + fallback cuts).
+- It avoids hard-coding line numbers that drift over time.
+
+## Example code
+
+The canonical sample is defined in:
+
+- `src/compiler/use_cases.py` as `RECOMMENDED_AUTO_CUT_UDF`
+
+## Run the local example
+
+```bash
+cd core/python_compiling_service
+.venv/bin/python examples/good_use_case.py
+```
+
+Expected summary shape:
+
+- `num_args: 2`
+- `baseline_mode: False`
+- `ranked_cuts: [...]` (non-empty)
+- `cuts_used: [...]` (first selected cut)
+- generated operator class containing `process_table_0` and `process_table_1`
+
+## HTTP service usage
+
+```bash
+curl -s -X POST http://localhost:9999/compile \
+  -H 'Content-Type: application/json' \
+  -d @- <<'JSON'
+{
+  "code": "import pandas as pd\n\ndef score_events(df_events: pd.DataFrame, 
df_weights: pd.DataFrame) -> pd.DataFrame:\n    active = 
df_events[df_events[\"event\"] != \"idle\"]\n    joined = pd.merge(active, 
df_weights, on=\"event\", how=\"left\")\n    joined[\"score\"] = 
joined[\"count\"] * joined[\"weight\"].fillna(0)\n    result = 
joined[[\"user_id\", \"event\", \"score\"]]\n    return result\n"
+}
+JSON
+```
+
+The response is plain text Python code for the generated operator class.
diff --git a/core/python_compiling_service/examples/good_use_case.py 
b/core/python_compiling_service/examples/good_use_case.py
new file mode 100644
index 0000000000..2f9d9e1079
--- /dev/null
+++ b/core/python_compiling_service/examples/good_use_case.py
@@ -0,0 +1,33 @@
+#!/usr/bin/env python3
+"""Recommended end-to-end example for compiling a two-input pandas UDF."""
+
+import os
+import sys
+
+
+CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
+SRC_DIR = os.path.join(os.path.dirname(CURRENT_DIR), "src")
+if SRC_DIR not in sys.path:
+    sys.path.insert(0, SRC_DIR)
+
+from compiler import RECOMMENDED_AUTO_CUT_UDF, compile_udf
+
+
+def main() -> int:
+    result = compile_udf(RECOMMENDED_AUTO_CUT_UDF)
+    ranked_lines = [cut["line_number"] for cut in result.ranked_cuts]
+    selected_lines = [cut["line_number"] for cut in result.cuts_used]
+
+    print("=== Recommended UDF compile use case ===")
+    print(f"num_args: {result.num_args}")
+    print(f"baseline_mode: {result.baseline_mode}")
+    print(f"ranked_cuts: {ranked_lines}")
+    print(f"cuts_used: {selected_lines}")
+    print(f"process_tables: {sorted(result.process_tables.keys())}")
+    print("\nGenerated operator class:\n")
+    print(result.operator_class)
+    return 0
+
+
+if __name__ == "__main__":
+    raise SystemExit(main())
diff --git a/core/python_compiling_service/src/compiler/README.md 
b/core/python_compiling_service/src/compiler/README.md
index e406070996..271706fa8a 100644
--- a/core/python_compiling_service/src/compiler/README.md
+++ b/core/python_compiling_service/src/compiler/README.md
@@ -5,6 +5,7 @@ This package is the canonical implementation layer for UDF 
compilation.
 - `models.py`: typed request/response dataclasses.
 - `config.py`: shared constants for cut heuristics and type-size defaults.
 - `common.py`: lightweight helpers (line directive parsing, variable-name 
normalization).
+- `use_cases.py`: canonical sample UDFs used by docs/service/tests.
 - `facade.py`: stable entrypoint `compile_udf(code, line_number)` plus 
`compile_udf_legacy(...)` for dict output.
 - `orchestrator.py`: non-baseline compile pipeline orchestration with 
injectable steps.
 - `pipeline.py` / `cut_strategy.py` / `ssa_transform.py` / `baseline.py`: 
extracted behavior-preserving
diff --git a/core/python_compiling_service/src/compiler/__init__.py 
b/core/python_compiling_service/src/compiler/__init__.py
index f4964e7d9c..3d12965440 100644
--- a/core/python_compiling_service/src/compiler/__init__.py
+++ b/core/python_compiling_service/src/compiler/__init__.py
@@ -30,6 +30,7 @@ from .splitter import 
apply_loop_transformation_to_process_table, generate_proce
 from .ssa_core import SSA, SSATransformer
 from .ssa_transform import convert_ssa_to_self
 from .type_inference import infer_types_from_code
+from .use_cases import BASELINE_REFERENCE_UDF, RECOMMENDED_AUTO_CUT_UDF
 
 __all__ = [
     "CompileRequest",
@@ -71,4 +72,6 @@ __all__ = [
     "SSA",
     "SSATransformer",
     "convert_ssa_to_self",
+    "RECOMMENDED_AUTO_CUT_UDF",
+    "BASELINE_REFERENCE_UDF",
 ]
diff --git a/core/python_compiling_service/src/compiler/use_cases.py 
b/core/python_compiling_service/src/compiler/use_cases.py
new file mode 100644
index 0000000000..397b1612d2
--- /dev/null
+++ b/core/python_compiling_service/src/compiler/use_cases.py
@@ -0,0 +1,21 @@
+"""Canonical sample inputs for integration demos and regression checks."""
+
+RECOMMENDED_AUTO_CUT_UDF = """import pandas as pd
+
+def score_events(df_events: pd.DataFrame, df_weights: pd.DataFrame) -> 
pd.DataFrame:
+    active = df_events[df_events["event"] != "idle"]
+    joined = pd.merge(active, df_weights, on="event", how="left")
+    joined["score"] = joined["count"] * joined["weight"].fillna(0)
+    result = joined[["user_id", "event", "score"]]
+    return result
+"""
+
+
+BASELINE_REFERENCE_UDF = """#baseline
+import pandas as pd
+
+def score_events_baseline(df_events: pd.DataFrame, df_weights: pd.DataFrame) 
-> pd.DataFrame:
+    joined = pd.merge(df_events, df_weights, on="event", how="left")
+    joined["score"] = joined["count"] * joined["weight"].fillna(0)
+    return joined[["user_id", "event", "score"]]
+"""
diff --git a/core/python_compiling_service/src/port_detector.py 
b/core/python_compiling_service/src/port_detector.py
index d67ebaa51f..05619ce510 100644
--- a/core/python_compiling_service/src/port_detector.py
+++ b/core/python_compiling_service/src/port_detector.py
@@ -1,17 +1,17 @@
 import ast
-# import astviz  # Only import in __main__
+from collections import defaultdict
 
 def detect_ports_and_classify_statements(source_code: str):
+    """Infer statement-to-port ownership from a simple AST walk."""
     tree = ast.parse(source_code)
-    # astviz.view(tree)  # Remove from here
     fn_def = tree.body[0]
-    
-    # Step 1: Identify input arguments -> ports
+
+    # Step 1: Identify input arguments -> ports.
     arg_ports = [arg.arg for arg in fn_def.args.args]
     port_origin = {arg: f"{arg}_port" for arg in arg_ports}
     var_to_port = {}
 
-    # Step 2: Direct unpacking from input (e.g., X_train, y_train = train_set)
+    # Step 2: Direct unpacking from input (e.g., X_train, y_train = train_set).
     for stmt in fn_def.body:
         if isinstance(stmt, ast.Assign) and isinstance(stmt.value, ast.Name):
             source = stmt.value.id
@@ -22,7 +22,7 @@ def detect_ports_and_classify_statements(source_code: str):
                         if isinstance(elt, ast.Name):
                             var_to_port[elt.id] = port_origin[source]
 
-    # Step 3: Propagate port origin to derived variables
+    # Step 3: Propagate port origin to derived variables.
     def propagate_variable_origins(fn_body):
         for stmt in fn_body:
             lhs_vars = set()
@@ -41,7 +41,7 @@ def detect_ports_and_classify_statements(source_code: str):
 
     propagate_variable_origins(fn_def.body)
 
-    # Step 4: Classify each statement
+    # Step 4: Classify each statement.
     stmt_to_port = []
     for stmt in fn_def.body:
         stmt_vars = {node.id for node in ast.walk(stmt) if isinstance(node, 
ast.Name)}
@@ -59,12 +59,10 @@ def detect_ports_and_classify_statements(source_code: str):
 
     return port_origin, stmt_to_port
 
-import pandas as pd
-import astroid
-
-from collections import defaultdict
 
 def build_dependency_graph(function_node):
+    import astroid
+
     dependency_graph = defaultdict(set)
     for stmt in function_node.body:
         if isinstance(stmt, astroid.Assign):
@@ -78,6 +76,7 @@ def build_dependency_graph(function_node):
                 dependency_graph[node.name].update(rhs_vars)
     return dependency_graph
 
+
 def backward_propagate_ports(var_port_map, dependency_graph):
     updated = True
     while updated:
@@ -85,36 +84,43 @@ def backward_propagate_ports(var_port_map, 
dependency_graph):
         for var, deps in dependency_graph.items():
             dep_ports = {var_port_map.get(dep) for dep in deps if dep in 
var_port_map}
             dep_ports.discard(None)
-            if len(dep_ports) == 1 and (var not in var_port_map or 
var_port_map[var] != next(iter(dep_ports))):
+            if len(dep_ports) == 1 and (
+                var not in var_port_map or var_port_map[var] != 
next(iter(dep_ports))
+            ):
                 var_port_map[var] = next(iter(dep_ports))
                 updated = True
 
+
 def label_statements_by_port(code: str):
     import astroid
     import pandas as pd
+
     module = astroid.parse(code)
     function_node = next((n for n in module.body if isinstance(n, 
astroid.FunctionDef)), None)
     if function_node is None:
         raise ValueError("No function definition found.")
 
-    # Step 1: Arguments as ports
+    # Step 1: Arguments as ports.
     arg_names = [arg.name for arg in function_node.args.args]
     arg_port_map = {arg: f"{arg}_port" for arg in arg_names}
-    var_port_map = dict(arg_port_map)  # Dye map: variable -> port or 'shared'
+    var_port_map = dict(arg_port_map)  # variable -> port or "shared"
 
     statement_ports = []
     for stmt in function_node.body:
-        # Find variables being assigned (Store)
         assigned_vars = {node.name for node in 
stmt.nodes_of_class(astroid.AssignName)}
-        # Find variables being loaded (used)
         loaded_vars = {node.name for node in stmt.nodes_of_class(astroid.Name)}
         loaded_vars -= assigned_vars
-        # Determine the port(s) of the loaded variables
+
         loaded_arg_ports = {arg_port_map.get(var) for var in loaded_vars if 
var in arg_port_map}
         loaded_arg_ports.discard(None)
-        loaded_local_ports = {var_port_map.get(var) for var in loaded_vars if 
var in var_port_map and var not in arg_port_map}
+        loaded_local_ports = {
+            var_port_map.get(var)
+            for var in loaded_vars
+            if var in var_port_map and var not in arg_port_map
+        }
         loaded_local_ports.discard(None)
-        # 染色逻辑:优先 argument 的 port
+
+        # Coloring rule: prioritize argument-origin ports.
         for var in assigned_vars:
             if loaded_arg_ports:
                 if len(loaded_arg_ports) == 1:
@@ -126,8 +132,7 @@ def label_statements_by_port(code: str):
                     var_port_map[var] = next(iter(loaded_local_ports))
                 else:
                     var_port_map[var] = "shared"
-            # else: do not assign if no clear port
-        # Assign statement to port
+
         if assigned_vars:
             stmt_ports = {var_port_map.get(var) for var in assigned_vars if 
var in var_port_map}
             stmt_ports.discard(None)
@@ -138,7 +143,6 @@ def label_statements_by_port(code: str):
             else:
                 label = "global"
         else:
-            # If only loading, assign to the port(s) of the loaded variables
             all_ports = loaded_arg_ports | loaded_local_ports
             if len(all_ports) == 1:
                 label = next(iter(all_ports))
@@ -146,25 +150,22 @@ def label_statements_by_port(code: str):
                 label = "shared"
             else:
                 label = "global"
+
         statement_ports.append((stmt.lineno, stmt.as_string(), label))
 
     df = pd.DataFrame(statement_ports, columns=["Line", "Statement", "Port 
Assignment"])
 
-    # --- Backward propagation for global assignments ---
-    # 1. Find all variables assigned as global
+    # Backward propagation for global assignments.
     global_vars = set()
     stmt_lineno_to_var = {}
-    for i, (lineno, stmt_str, label) in enumerate(statement_ports):
+    for i, (lineno, _stmt_str, label) in enumerate(statement_ports):
         if label == "global":
-            # Try to extract assigned variable(s) from the statement string
-            # (astroid already gives us assigned_vars per statement)
             stmt = function_node.body[i]
             assigned_vars = {node.name for node in 
stmt.nodes_of_class(astroid.AssignName)}
             for var in assigned_vars:
                 global_vars.add(var)
                 stmt_lineno_to_var[lineno] = var
 
-    # 2. For each global var, find which port's statements use it
     var_used_by_ports = {var: set() for var in global_vars}
     for i, stmt in enumerate(function_node.body):
         loaded_vars = {node.name for node in stmt.nodes_of_class(astroid.Name)}
@@ -174,218 +175,17 @@ def label_statements_by_port(code: str):
                 if port_label not in ("global", "shared"):
                     var_used_by_ports[var].add(port_label)
 
-    # 3. Update variable and assignment statement port if only used by one port
     for var, ports in var_used_by_ports.items():
         if len(ports) == 1:
             port = next(iter(ports))
-            # Update all assignment statements for this var
             for i, (lineno, stmt_str, label) in enumerate(statement_ports):
                 if stmt_lineno_to_var.get(lineno) == var and label == "global":
                     statement_ports[i] = (lineno, stmt_str, port)
-            # Update var_port_map for this var
             var_port_map[var] = port
         elif len(ports) > 1:
-            # If used by multiple ports, mark as shared
             for i, (lineno, stmt_str, label) in enumerate(statement_ports):
                 if stmt_lineno_to_var.get(lineno) == var and label == "global":
                     statement_ports[i] = (lineno, stmt_str, "shared")
             var_port_map[var] = "shared"
-        # else: if not used, keep as global
-
-    df = pd.DataFrame(statement_ports, columns=["Line", "Statement", "Port 
Assignment"])
-    return df
-
-if __name__ == "__main__":
-#     source_code = """
-# def train_and_evaluate_model(train_set, test_set):
-#     X_train, y_train = train_set
-#     X_test, y_test = test_set
-
-#     model = LogisticRegression()
-#     model.fit(X_train, y_train)
-
-#     predictions = []
-#     for x in X_test:
-#         y_pred = model.predict(x)[0]
-#         predictions.append(y_pred)
-
-#     correct = 0
-#     total = len(y_test)
-#     for i, y_true in enumerate(y_test):
-#         if predictions[i] == y_true:
-#             correct += 1
-
-#     accuracy = correct / total
-#     return model, predictions, accuracy
-#     """
-
-    source_code = """
-def f(x, y):
-    model = train(x)
-    yield (model, "model")
-
-    for t in y:
-        z = model.predict(t)
-        yield z, "test"
-"""
-    # Print the AST of the source code
-    import ast
-    print(ast.dump(ast.parse(source_code), indent=4))
-
-    # Visualize the AST using graphviz
-    from graphviz import Digraph
-    def ast_to_graphviz(node, graph=None, parent=None):
-        import ast
-        import hashlib
-        # Color palette for variable nodes
-        color_palette = [
-            'lightblue', 'lightgreen', 'yellow', 'orange', 'pink', 'violet', 
'gold', 'cyan', 'magenta', 'salmon', 'khaki', 'plum', 'wheat', 'tan', 
'thistle', 'azure', 'beige', 'coral', 'ivory', 'lavender'
-        ]
-
-        if graph is None:
-            graph = Digraph()
-        node_id = str(id(node))
-        # Show variable names for ast.Name nodes, with color
-        if isinstance(node, ast.Name):
-            label = f"Name: {node.id}"
-            # Assign a color based on variable name
-            color_idx = int(hashlib.md5(node.id.encode()).hexdigest(), 16) % 
len(color_palette)
-            color = color_palette[color_idx]
-            graph.node(node_id, label, style='filled', fillcolor=color)
-        else:
-            label = type(node).__name__
-            graph.node(node_id, label)
-        if parent:
-            graph.edge(str(id(parent)), node_id)
-        for child in ast.iter_child_nodes(node):
-            ast_to_graphviz(child, graph, node)
-        return graph
-
-    tree = ast.parse(source_code)
-    # graph = ast_to_graphviz(tree)
-    # graph.render('ast_output', view=True)  # This will create ast_output.pdf 
and open it
-    graph.render('output/ast_output', view=True)  # This will create 
ast_output.pdf and open it
-
-    port_origin, stmt_to_port = 
detect_ports_and_classify_statements(source_code)
-    # print(port_origin)
-    # print(stmt_to_port)
-
-    df = label_statements_by_port(source_code)
-    print(df)
-
-    # --- Variable Dependency Graph ---
-    def build_variable_dependency_graph(source_code):
-        import ast
-        from graphviz import Digraph
-        tree = ast.parse(source_code)
-        fn_def = next((n for n in tree.body if isinstance(n, 
ast.FunctionDef)), None)
-        if fn_def is None:
-            raise ValueError("No function definition found.")
-        dep_graph = {}  # var: set of vars it depends on
-        # Add arguments as nodes
-        for arg in fn_def.args.args:
-            dep_graph[arg.arg] = set()
-        # Helper to process a list of statements recursively
-        def process_body(body):
-            for stmt in body:
-                # Assignments
-                if isinstance(stmt, ast.Assign):
-                    lhs_vars = set()
-                    for target in stmt.targets:
-                        if isinstance(target, ast.Name):
-                            lhs_vars.add(target.id)
-                        elif isinstance(target, ast.Tuple):
-                            for elt in target.elts:
-                                if isinstance(elt, ast.Name):
-                                    lhs_vars.add(elt.id)
-                    rhs_vars = {n.id for n in ast.walk(stmt.value) if 
isinstance(n, ast.Name)}
-                    for lhs in lhs_vars:
-                        if lhs not in dep_graph:
-                            dep_graph[lhs] = set()
-                        dep_graph[lhs].update(rhs_vars)
-                # For loops (target depends on iter), and process body 
recursively
-                elif isinstance(stmt, ast.For):
-                    if isinstance(stmt.target, ast.Name):
-                        iter_vars = {n.id for n in ast.walk(stmt.iter) if 
isinstance(n, ast.Name)}
-                        dep_graph.setdefault(stmt.target.id, 
set()).update(iter_vars)
-                    process_body(stmt.body)
-                # If, While, With, etc. (process their bodies recursively)
-                elif hasattr(stmt, 'body') and isinstance(stmt.body, list):
-                    process_body(stmt.body)
-                # Also process orelse blocks if present
-                if hasattr(stmt, 'orelse') and isinstance(stmt.orelse, list):
-                    process_body(stmt.orelse)
-        process_body(fn_def.body)
-        # Visualize
-        g = Digraph(comment="Variable Dependency Graph")
-        for var in dep_graph:
-            g.node(var)
-        for var, deps in dep_graph.items():
-            for dep in deps:
-                g.edge(dep, var)
-        return g
-
-    # Build and visualize the variable dependency graph
-    # var_graph = build_variable_dependency_graph(source_code)
-    # var_graph.render('variable_dependency_graph', view=True)
-    var_graph = build_variable_dependency_graph(source_code)
-    var_graph.render('output/variable_dependency_graph', view=True)
-
-    import ast
-
-class SSATransformer(ast.NodeTransformer):
-    def __init__(self):
-        self.version = {}
-    
-    def fresh(self, var):
-        self.version[var] = self.version.get(var, 0) + 1
-        return f"{var}_{self.version[var]}"
-    
-    def visit_Assign(self, node):
-        self.generic_visit(node)
-        if isinstance(node.targets[0], ast.Name):
-            var = node.targets[0].id
-            node.targets[0].id = self.fresh(var)
-        return node
-    
-    def visit_Name(self, node):
-        if isinstance(node.ctx, ast.Load) and node.id in self.version:
-            node.id = f"{node.id}_{self.version[node.id]}"
-        return node
-
-
-tree = ast.parse(source_code)
-ssa_tree = SSATransformer().visit(tree)
-import astor
-print(astor.to_source(ssa_tree))
-
-def build_ssa_dependency_graph_with_lineno(tree):
-    dep_graph = {}
-    node_labels = {}
-    for node in ast.walk(tree):
-        if isinstance(node, ast.Assign) and isinstance(node.targets[0], 
ast.Name):
-            lhs = node.targets[0].id
-            lineno = getattr(node, 'lineno', None)
-            label = f"{lhs} (line {lineno})" if lineno else lhs
-            node_labels[lhs] = label
-            rhs_vars = {n.id for n in ast.walk(node.value) if isinstance(n, 
ast.Name)}
-            dep_graph[lhs] = (rhs_vars, lineno)
-    return dep_graph, node_labels
-
-def visualize_dep_graph_with_lineno(dep_graph, node_labels):
-    from graphviz import Digraph
-    g = Digraph(comment="SSA Variable Dependency Graph with Line Numbers")
-    for var, label in node_labels.items():
-        g.node(var, label)
-    for var, (deps, _) in dep_graph.items():
-        for dep in deps:
-            if dep in node_labels:
-                g.edge(dep, var)
-    g.render('ssa_variable_dependency_graph', view=True)
-    g.render('output/ssa_variable_dependency_graph', view=True)
 
-# Example usage:
-tree = ast.parse(source_code)
-ssa_tree = SSATransformer().visit(tree)
-dep_graph, node_labels = build_ssa_dependency_graph_with_lineno(ssa_tree)
-visualize_dep_graph_with_lineno(dep_graph, node_labels)
\ No newline at end of file
+    return pd.DataFrame(statement_ports, columns=["Line", "Statement", "Port 
Assignment"])
diff --git a/core/python_compiling_service/src/udf_compiling_service.py 
b/core/python_compiling_service/src/udf_compiling_service.py
index 14b51473af..b65e35a48c 100644
--- a/core/python_compiling_service/src/udf_compiling_service.py
+++ b/core/python_compiling_service/src/udf_compiling_service.py
@@ -15,7 +15,12 @@ import os
 # Add the src directory to the path so we can import our modules
 sys.path.append(os.path.dirname(os.path.abspath(__file__)))
 
-from compiler import compile_udf, infer_line_number_from_code
+from compiler import (
+    BASELINE_REFERENCE_UDF,
+    RECOMMENDED_AUTO_CUT_UDF,
+    compile_udf,
+    infer_line_number_from_code,
+)
 
 app = Flask(__name__)
 CORS(app)  # Enable CORS for all routes
@@ -107,60 +112,24 @@ def compile_code_get():
 @app.route('/example', methods=['GET'])
 def get_example():
     """Get an example of the expected request format."""
-    
-    # Example 1: Normal compilation with specific line cut
-    example_code_with_cut = '''#5
-import pandas as pd
-
-def enrich_and_score(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:
-    # Step 1: Filter df1
-    df1_filtered = df1[df1['activity'] != 'idle']
-    
-    # Step 2: Merge df1 with df2 on user_id
-    merged = pd.merge(df1_filtered, df2, on='user_id', how='inner')
-    
-    # Step 3: Define a simple activity -> value mapping
-    activity_points = {
-        'login': 1,
-        'logout': 0.5,
-        'purchase': 5,
-        'comment': 2
-    }
-    
-    # Step 4: Compute score = activity_value * group weight
-    merged['activity_value'] = 
merged['activity'].map(activity_points).fillna(0)
-    merged['score'] = merged['activity_value'] * merged['weight']
-    
-    return merged[['user_id', 'activity', 'group', 'score']]'''
-    
-    # Example 2: Baseline compilation
-    example_code_baseline = '''#baseline
-import pandas as pd
 
-def simple_function(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:
-    merged = pd.merge(df1, df2, on='user_id', how='inner')
-    return merged[['user_id', 'activity', 'score']]'''
-    
-    # Example 3: Auto compilation (no line number)
-    example_code_auto = '''import pandas as pd
+    # Stable examples live under compiler/use_cases.py so docs, API, and tests 
stay in sync.
+    example_code_with_cut = f"#5\n{RECOMMENDED_AUTO_CUT_UDF}"
+    example_code_baseline = BASELINE_REFERENCE_UDF
+    example_code_auto = RECOMMENDED_AUTO_CUT_UDF
 
-def auto_compile(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:
-    df1_filtered = df1[df1['activity'] != 'idle']
-    merged = pd.merge(df1_filtered, df2, on='user_id', how='inner')
-    return merged[['user_id', 'activity', 'score']]'''
-    
     examples = {
         'example_with_line_cut': {
             'code': example_code_with_cut,
-            'description': 'Compilation with specific line cut. The first line 
"#5" specifies to cut at line 5.'
+            'description': 'Compile with an explicit cut on line 5 to force 
process-table split position.'
         },
         'example_baseline': {
             'code': example_code_baseline,
-            'description': 'Baseline compilation. The first line "#baseline" 
creates a single process_tables method.'
+            'description': 'Baseline mode. "#baseline" forces a single 
process_tables method.'
         },
         'example_auto': {
             'code': example_code_auto,
-            'description': 'Auto compilation. No line number specified, will 
use optimal cuts based on dependency analysis.'
+            'description': 'Recommended default mode. No line prefix; compiler 
picks the cut automatically.'
         }
     }
     
diff --git a/core/python_compiling_service/tests/test_good_use_case.py 
b/core/python_compiling_service/tests/test_good_use_case.py
new file mode 100644
index 0000000000..0ffae1ad4b
--- /dev/null
+++ b/core/python_compiling_service/tests/test_good_use_case.py
@@ -0,0 +1,30 @@
+import os
+import sys
+
+# Add the project root to the Python path
+project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+sys.path.insert(0, project_root)
+
+from src.compiler import RECOMMENDED_AUTO_CUT_UDF, compile_udf
+
+
+def test_recommended_use_case_auto_cut_compile_shape():
+    result = compile_udf(RECOMMENDED_AUTO_CUT_UDF)
+
+    assert result.num_args == 2
+    assert result.baseline_mode is False
+
+    assert result.ranked_cuts, "Expected at least one ranked cut for the 
recommended use case"
+    assert result.cuts_used, "Expected at least one selected cut for the 
recommended use case"
+
+    ranked_lines = {cut["line_number"] for cut in result.ranked_cuts}
+    selected_line = result.cuts_used[0]["line_number"]
+    assert selected_line in ranked_lines
+
+    assert "class Operator(UDFGeneralOperator):" in result.operator_class
+    assert "def process_table_0(" in result.operator_class
+    assert "def process_table_1(" in result.operator_class
+    assert "yield {'result': self.result}" in result.operator_class
+
+    assert "process_table_0" in result.process_tables
+    assert "process_table_1" in result.process_tables
diff --git a/core/python_compiling_service/tests/test_service_api.py 
b/core/python_compiling_service/tests/test_service_api.py
index fdfa11948b..7b3069a71b 100644
--- a/core/python_compiling_service/tests/test_service_api.py
+++ b/core/python_compiling_service/tests/test_service_api.py
@@ -7,6 +7,7 @@ project_root = 
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 sys.path.insert(0, project_root)
 
 import src.udf_compiling_service as service
+from src.compiler import BASELINE_REFERENCE_UDF, RECOMMENDED_AUTO_CUT_UDF
 
 
 def test_post_compile_success(monkeypatch):
@@ -72,3 +73,13 @@ def test_get_compile_internal_error(monkeypatch):
     resp = client.get("/compile", query_string={"code": "a=1"})
     assert resp.status_code == 500
     assert "Internal server error: boom" in resp.get_data(as_text=True)
+
+
+def test_example_endpoint_uses_canonical_use_cases():
+    client = service.app.test_client()
+    resp = client.get("/example")
+
+    assert resp.status_code == 200
+    payload = resp.get_json()
+    assert payload["examples"]["example_auto"]["code"] == 
RECOMMENDED_AUTO_CUT_UDF
+    assert payload["examples"]["example_baseline"]["code"] == 
BASELINE_REFERENCE_UDF

Reply via email to