This is an automated email from the ASF dual-hosted git repository. yiconghuang pushed a commit to branch port-based-workflow-exp in repository https://gitbox.apache.org/repos/asf/texera.git
commit d4896a054ca356209674c4a3a0abc612ac5cc00e Author: Yicong Huang <[email protected]> AuthorDate: Sun Mar 1 23:59:02 2026 -0800 chore(python_compiling_service): add canonical good use case and clean module boundaries --- .gitignore | 14 ++ core/python_compiling_service/.gitignore | 4 + core/python_compiling_service/docs/architecture.md | 7 + .../python_compiling_service/docs/good_use_case.md | 48 ++++ .../examples/good_use_case.py | 33 +++ .../src/compiler/README.md | 1 + .../src/compiler/__init__.py | 3 + .../src/compiler/use_cases.py | 21 ++ core/python_compiling_service/src/port_detector.py | 260 +++------------------ .../src/udf_compiling_service.py | 57 ++--- .../tests/test_good_use_case.py | 30 +++ .../tests/test_service_api.py | 11 + 12 files changed, 215 insertions(+), 274 deletions(-) diff --git a/.gitignore b/.gitignore index 627c56f087..2230427d39 100644 --- a/.gitignore +++ b/.gitignore @@ -125,3 +125,17 @@ values-*.yaml # Any environment-specific value overrides # Ignore nested node modules **/node_modules/ **/package-lock.json + +# Local frontend build/cache artifacts (top-level frontend workspace) +frontend/.angular/ +frontend/.nx/ +frontend/dist/ +frontend/dist-server/ +frontend/tmp/ +frontend/out-tsc/ +frontend/src/environments/version.ts +frontend/.pnp.* +frontend/.yarn/install-state.gz + +# Local amber debug artifact +amber/physicalPlan.json diff --git a/core/python_compiling_service/.gitignore b/core/python_compiling_service/.gitignore new file mode 100644 index 0000000000..2e26d63d7e --- /dev/null +++ b/core/python_compiling_service/.gitignore @@ -0,0 +1,4 @@ +.venv/ +.pytest_cache/ +__pycache__/ +*.py[cod] diff --git a/core/python_compiling_service/docs/architecture.md b/core/python_compiling_service/docs/architecture.md index 71ef84995a..17cedec0f7 100644 --- a/core/python_compiling_service/docs/architecture.md +++ b/core/python_compiling_service/docs/architecture.md @@ -73,3 +73,10 @@ flowchart LR B7 --> B8 B7 --> B9 ``` + +## Recommended Use Case + +For a concrete compile input and expected output shape, use: + +- `core/python_compiling_service/docs/good_use_case.md` +- `core/python_compiling_service/examples/good_use_case.py` diff --git a/core/python_compiling_service/docs/good_use_case.md b/core/python_compiling_service/docs/good_use_case.md new file mode 100644 index 0000000000..09168b9df9 --- /dev/null +++ b/core/python_compiling_service/docs/good_use_case.md @@ -0,0 +1,48 @@ +# Good Use Case: Auto-Cut UDF Compile + +This is the recommended path for production usage: + +1. Submit a pandas-style UDF with 2+ input tables. +2. Do not add a `#<line>` directive. +3. Let the compiler pick the best split point automatically. + +## Why this is the default + +- It keeps behavior stable across UDF edits. +- It preserves compile flexibility (ranking + fallback cuts). +- It avoids hard-coding line numbers that drift over time. + +## Example code + +The canonical sample is defined in: + +- `src/compiler/use_cases.py` as `RECOMMENDED_AUTO_CUT_UDF` + +## Run the local example + +```bash +cd core/python_compiling_service +.venv/bin/python examples/good_use_case.py +``` + +Expected summary shape: + +- `num_args: 2` +- `baseline_mode: False` +- `ranked_cuts: [...]` (non-empty) +- `cuts_used: [...]` (first selected cut) +- generated operator class containing `process_table_0` and `process_table_1` + +## HTTP service usage + +```bash +curl -s -X POST http://localhost:9999/compile \ + -H 'Content-Type: application/json' \ + -d @- <<'JSON' +{ + "code": "import pandas as pd\n\ndef score_events(df_events: pd.DataFrame, df_weights: pd.DataFrame) -> pd.DataFrame:\n active = df_events[df_events[\"event\"] != \"idle\"]\n joined = pd.merge(active, df_weights, on=\"event\", how=\"left\")\n joined[\"score\"] = joined[\"count\"] * joined[\"weight\"].fillna(0)\n result = joined[[\"user_id\", \"event\", \"score\"]]\n return result\n" +} +JSON +``` + +The response is plain text Python code for the generated operator class. diff --git a/core/python_compiling_service/examples/good_use_case.py b/core/python_compiling_service/examples/good_use_case.py new file mode 100644 index 0000000000..2f9d9e1079 --- /dev/null +++ b/core/python_compiling_service/examples/good_use_case.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 +"""Recommended end-to-end example for compiling a two-input pandas UDF.""" + +import os +import sys + + +CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) +SRC_DIR = os.path.join(os.path.dirname(CURRENT_DIR), "src") +if SRC_DIR not in sys.path: + sys.path.insert(0, SRC_DIR) + +from compiler import RECOMMENDED_AUTO_CUT_UDF, compile_udf + + +def main() -> int: + result = compile_udf(RECOMMENDED_AUTO_CUT_UDF) + ranked_lines = [cut["line_number"] for cut in result.ranked_cuts] + selected_lines = [cut["line_number"] for cut in result.cuts_used] + + print("=== Recommended UDF compile use case ===") + print(f"num_args: {result.num_args}") + print(f"baseline_mode: {result.baseline_mode}") + print(f"ranked_cuts: {ranked_lines}") + print(f"cuts_used: {selected_lines}") + print(f"process_tables: {sorted(result.process_tables.keys())}") + print("\nGenerated operator class:\n") + print(result.operator_class) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/core/python_compiling_service/src/compiler/README.md b/core/python_compiling_service/src/compiler/README.md index e406070996..271706fa8a 100644 --- a/core/python_compiling_service/src/compiler/README.md +++ b/core/python_compiling_service/src/compiler/README.md @@ -5,6 +5,7 @@ This package is the canonical implementation layer for UDF compilation. - `models.py`: typed request/response dataclasses. - `config.py`: shared constants for cut heuristics and type-size defaults. - `common.py`: lightweight helpers (line directive parsing, variable-name normalization). +- `use_cases.py`: canonical sample UDFs used by docs/service/tests. - `facade.py`: stable entrypoint `compile_udf(code, line_number)` plus `compile_udf_legacy(...)` for dict output. - `orchestrator.py`: non-baseline compile pipeline orchestration with injectable steps. - `pipeline.py` / `cut_strategy.py` / `ssa_transform.py` / `baseline.py`: extracted behavior-preserving diff --git a/core/python_compiling_service/src/compiler/__init__.py b/core/python_compiling_service/src/compiler/__init__.py index f4964e7d9c..3d12965440 100644 --- a/core/python_compiling_service/src/compiler/__init__.py +++ b/core/python_compiling_service/src/compiler/__init__.py @@ -30,6 +30,7 @@ from .splitter import apply_loop_transformation_to_process_table, generate_proce from .ssa_core import SSA, SSATransformer from .ssa_transform import convert_ssa_to_self from .type_inference import infer_types_from_code +from .use_cases import BASELINE_REFERENCE_UDF, RECOMMENDED_AUTO_CUT_UDF __all__ = [ "CompileRequest", @@ -71,4 +72,6 @@ __all__ = [ "SSA", "SSATransformer", "convert_ssa_to_self", + "RECOMMENDED_AUTO_CUT_UDF", + "BASELINE_REFERENCE_UDF", ] diff --git a/core/python_compiling_service/src/compiler/use_cases.py b/core/python_compiling_service/src/compiler/use_cases.py new file mode 100644 index 0000000000..397b1612d2 --- /dev/null +++ b/core/python_compiling_service/src/compiler/use_cases.py @@ -0,0 +1,21 @@ +"""Canonical sample inputs for integration demos and regression checks.""" + +RECOMMENDED_AUTO_CUT_UDF = """import pandas as pd + +def score_events(df_events: pd.DataFrame, df_weights: pd.DataFrame) -> pd.DataFrame: + active = df_events[df_events["event"] != "idle"] + joined = pd.merge(active, df_weights, on="event", how="left") + joined["score"] = joined["count"] * joined["weight"].fillna(0) + result = joined[["user_id", "event", "score"]] + return result +""" + + +BASELINE_REFERENCE_UDF = """#baseline +import pandas as pd + +def score_events_baseline(df_events: pd.DataFrame, df_weights: pd.DataFrame) -> pd.DataFrame: + joined = pd.merge(df_events, df_weights, on="event", how="left") + joined["score"] = joined["count"] * joined["weight"].fillna(0) + return joined[["user_id", "event", "score"]] +""" diff --git a/core/python_compiling_service/src/port_detector.py b/core/python_compiling_service/src/port_detector.py index d67ebaa51f..05619ce510 100644 --- a/core/python_compiling_service/src/port_detector.py +++ b/core/python_compiling_service/src/port_detector.py @@ -1,17 +1,17 @@ import ast -# import astviz # Only import in __main__ +from collections import defaultdict def detect_ports_and_classify_statements(source_code: str): + """Infer statement-to-port ownership from a simple AST walk.""" tree = ast.parse(source_code) - # astviz.view(tree) # Remove from here fn_def = tree.body[0] - - # Step 1: Identify input arguments -> ports + + # Step 1: Identify input arguments -> ports. arg_ports = [arg.arg for arg in fn_def.args.args] port_origin = {arg: f"{arg}_port" for arg in arg_ports} var_to_port = {} - # Step 2: Direct unpacking from input (e.g., X_train, y_train = train_set) + # Step 2: Direct unpacking from input (e.g., X_train, y_train = train_set). for stmt in fn_def.body: if isinstance(stmt, ast.Assign) and isinstance(stmt.value, ast.Name): source = stmt.value.id @@ -22,7 +22,7 @@ def detect_ports_and_classify_statements(source_code: str): if isinstance(elt, ast.Name): var_to_port[elt.id] = port_origin[source] - # Step 3: Propagate port origin to derived variables + # Step 3: Propagate port origin to derived variables. def propagate_variable_origins(fn_body): for stmt in fn_body: lhs_vars = set() @@ -41,7 +41,7 @@ def detect_ports_and_classify_statements(source_code: str): propagate_variable_origins(fn_def.body) - # Step 4: Classify each statement + # Step 4: Classify each statement. stmt_to_port = [] for stmt in fn_def.body: stmt_vars = {node.id for node in ast.walk(stmt) if isinstance(node, ast.Name)} @@ -59,12 +59,10 @@ def detect_ports_and_classify_statements(source_code: str): return port_origin, stmt_to_port -import pandas as pd -import astroid - -from collections import defaultdict def build_dependency_graph(function_node): + import astroid + dependency_graph = defaultdict(set) for stmt in function_node.body: if isinstance(stmt, astroid.Assign): @@ -78,6 +76,7 @@ def build_dependency_graph(function_node): dependency_graph[node.name].update(rhs_vars) return dependency_graph + def backward_propagate_ports(var_port_map, dependency_graph): updated = True while updated: @@ -85,36 +84,43 @@ def backward_propagate_ports(var_port_map, dependency_graph): for var, deps in dependency_graph.items(): dep_ports = {var_port_map.get(dep) for dep in deps if dep in var_port_map} dep_ports.discard(None) - if len(dep_ports) == 1 and (var not in var_port_map or var_port_map[var] != next(iter(dep_ports))): + if len(dep_ports) == 1 and ( + var not in var_port_map or var_port_map[var] != next(iter(dep_ports)) + ): var_port_map[var] = next(iter(dep_ports)) updated = True + def label_statements_by_port(code: str): import astroid import pandas as pd + module = astroid.parse(code) function_node = next((n for n in module.body if isinstance(n, astroid.FunctionDef)), None) if function_node is None: raise ValueError("No function definition found.") - # Step 1: Arguments as ports + # Step 1: Arguments as ports. arg_names = [arg.name for arg in function_node.args.args] arg_port_map = {arg: f"{arg}_port" for arg in arg_names} - var_port_map = dict(arg_port_map) # Dye map: variable -> port or 'shared' + var_port_map = dict(arg_port_map) # variable -> port or "shared" statement_ports = [] for stmt in function_node.body: - # Find variables being assigned (Store) assigned_vars = {node.name for node in stmt.nodes_of_class(astroid.AssignName)} - # Find variables being loaded (used) loaded_vars = {node.name for node in stmt.nodes_of_class(astroid.Name)} loaded_vars -= assigned_vars - # Determine the port(s) of the loaded variables + loaded_arg_ports = {arg_port_map.get(var) for var in loaded_vars if var in arg_port_map} loaded_arg_ports.discard(None) - loaded_local_ports = {var_port_map.get(var) for var in loaded_vars if var in var_port_map and var not in arg_port_map} + loaded_local_ports = { + var_port_map.get(var) + for var in loaded_vars + if var in var_port_map and var not in arg_port_map + } loaded_local_ports.discard(None) - # 染色逻辑:优先 argument 的 port + + # Coloring rule: prioritize argument-origin ports. for var in assigned_vars: if loaded_arg_ports: if len(loaded_arg_ports) == 1: @@ -126,8 +132,7 @@ def label_statements_by_port(code: str): var_port_map[var] = next(iter(loaded_local_ports)) else: var_port_map[var] = "shared" - # else: do not assign if no clear port - # Assign statement to port + if assigned_vars: stmt_ports = {var_port_map.get(var) for var in assigned_vars if var in var_port_map} stmt_ports.discard(None) @@ -138,7 +143,6 @@ def label_statements_by_port(code: str): else: label = "global" else: - # If only loading, assign to the port(s) of the loaded variables all_ports = loaded_arg_ports | loaded_local_ports if len(all_ports) == 1: label = next(iter(all_ports)) @@ -146,25 +150,22 @@ def label_statements_by_port(code: str): label = "shared" else: label = "global" + statement_ports.append((stmt.lineno, stmt.as_string(), label)) df = pd.DataFrame(statement_ports, columns=["Line", "Statement", "Port Assignment"]) - # --- Backward propagation for global assignments --- - # 1. Find all variables assigned as global + # Backward propagation for global assignments. global_vars = set() stmt_lineno_to_var = {} - for i, (lineno, stmt_str, label) in enumerate(statement_ports): + for i, (lineno, _stmt_str, label) in enumerate(statement_ports): if label == "global": - # Try to extract assigned variable(s) from the statement string - # (astroid already gives us assigned_vars per statement) stmt = function_node.body[i] assigned_vars = {node.name for node in stmt.nodes_of_class(astroid.AssignName)} for var in assigned_vars: global_vars.add(var) stmt_lineno_to_var[lineno] = var - # 2. For each global var, find which port's statements use it var_used_by_ports = {var: set() for var in global_vars} for i, stmt in enumerate(function_node.body): loaded_vars = {node.name for node in stmt.nodes_of_class(astroid.Name)} @@ -174,218 +175,17 @@ def label_statements_by_port(code: str): if port_label not in ("global", "shared"): var_used_by_ports[var].add(port_label) - # 3. Update variable and assignment statement port if only used by one port for var, ports in var_used_by_ports.items(): if len(ports) == 1: port = next(iter(ports)) - # Update all assignment statements for this var for i, (lineno, stmt_str, label) in enumerate(statement_ports): if stmt_lineno_to_var.get(lineno) == var and label == "global": statement_ports[i] = (lineno, stmt_str, port) - # Update var_port_map for this var var_port_map[var] = port elif len(ports) > 1: - # If used by multiple ports, mark as shared for i, (lineno, stmt_str, label) in enumerate(statement_ports): if stmt_lineno_to_var.get(lineno) == var and label == "global": statement_ports[i] = (lineno, stmt_str, "shared") var_port_map[var] = "shared" - # else: if not used, keep as global - - df = pd.DataFrame(statement_ports, columns=["Line", "Statement", "Port Assignment"]) - return df - -if __name__ == "__main__": -# source_code = """ -# def train_and_evaluate_model(train_set, test_set): -# X_train, y_train = train_set -# X_test, y_test = test_set - -# model = LogisticRegression() -# model.fit(X_train, y_train) - -# predictions = [] -# for x in X_test: -# y_pred = model.predict(x)[0] -# predictions.append(y_pred) - -# correct = 0 -# total = len(y_test) -# for i, y_true in enumerate(y_test): -# if predictions[i] == y_true: -# correct += 1 - -# accuracy = correct / total -# return model, predictions, accuracy -# """ - - source_code = """ -def f(x, y): - model = train(x) - yield (model, "model") - - for t in y: - z = model.predict(t) - yield z, "test" -""" - # Print the AST of the source code - import ast - print(ast.dump(ast.parse(source_code), indent=4)) - - # Visualize the AST using graphviz - from graphviz import Digraph - def ast_to_graphviz(node, graph=None, parent=None): - import ast - import hashlib - # Color palette for variable nodes - color_palette = [ - 'lightblue', 'lightgreen', 'yellow', 'orange', 'pink', 'violet', 'gold', 'cyan', 'magenta', 'salmon', 'khaki', 'plum', 'wheat', 'tan', 'thistle', 'azure', 'beige', 'coral', 'ivory', 'lavender' - ] - - if graph is None: - graph = Digraph() - node_id = str(id(node)) - # Show variable names for ast.Name nodes, with color - if isinstance(node, ast.Name): - label = f"Name: {node.id}" - # Assign a color based on variable name - color_idx = int(hashlib.md5(node.id.encode()).hexdigest(), 16) % len(color_palette) - color = color_palette[color_idx] - graph.node(node_id, label, style='filled', fillcolor=color) - else: - label = type(node).__name__ - graph.node(node_id, label) - if parent: - graph.edge(str(id(parent)), node_id) - for child in ast.iter_child_nodes(node): - ast_to_graphviz(child, graph, node) - return graph - - tree = ast.parse(source_code) - # graph = ast_to_graphviz(tree) - # graph.render('ast_output', view=True) # This will create ast_output.pdf and open it - graph.render('output/ast_output', view=True) # This will create ast_output.pdf and open it - - port_origin, stmt_to_port = detect_ports_and_classify_statements(source_code) - # print(port_origin) - # print(stmt_to_port) - - df = label_statements_by_port(source_code) - print(df) - - # --- Variable Dependency Graph --- - def build_variable_dependency_graph(source_code): - import ast - from graphviz import Digraph - tree = ast.parse(source_code) - fn_def = next((n for n in tree.body if isinstance(n, ast.FunctionDef)), None) - if fn_def is None: - raise ValueError("No function definition found.") - dep_graph = {} # var: set of vars it depends on - # Add arguments as nodes - for arg in fn_def.args.args: - dep_graph[arg.arg] = set() - # Helper to process a list of statements recursively - def process_body(body): - for stmt in body: - # Assignments - if isinstance(stmt, ast.Assign): - lhs_vars = set() - for target in stmt.targets: - if isinstance(target, ast.Name): - lhs_vars.add(target.id) - elif isinstance(target, ast.Tuple): - for elt in target.elts: - if isinstance(elt, ast.Name): - lhs_vars.add(elt.id) - rhs_vars = {n.id for n in ast.walk(stmt.value) if isinstance(n, ast.Name)} - for lhs in lhs_vars: - if lhs not in dep_graph: - dep_graph[lhs] = set() - dep_graph[lhs].update(rhs_vars) - # For loops (target depends on iter), and process body recursively - elif isinstance(stmt, ast.For): - if isinstance(stmt.target, ast.Name): - iter_vars = {n.id for n in ast.walk(stmt.iter) if isinstance(n, ast.Name)} - dep_graph.setdefault(stmt.target.id, set()).update(iter_vars) - process_body(stmt.body) - # If, While, With, etc. (process their bodies recursively) - elif hasattr(stmt, 'body') and isinstance(stmt.body, list): - process_body(stmt.body) - # Also process orelse blocks if present - if hasattr(stmt, 'orelse') and isinstance(stmt.orelse, list): - process_body(stmt.orelse) - process_body(fn_def.body) - # Visualize - g = Digraph(comment="Variable Dependency Graph") - for var in dep_graph: - g.node(var) - for var, deps in dep_graph.items(): - for dep in deps: - g.edge(dep, var) - return g - - # Build and visualize the variable dependency graph - # var_graph = build_variable_dependency_graph(source_code) - # var_graph.render('variable_dependency_graph', view=True) - var_graph = build_variable_dependency_graph(source_code) - var_graph.render('output/variable_dependency_graph', view=True) - - import ast - -class SSATransformer(ast.NodeTransformer): - def __init__(self): - self.version = {} - - def fresh(self, var): - self.version[var] = self.version.get(var, 0) + 1 - return f"{var}_{self.version[var]}" - - def visit_Assign(self, node): - self.generic_visit(node) - if isinstance(node.targets[0], ast.Name): - var = node.targets[0].id - node.targets[0].id = self.fresh(var) - return node - - def visit_Name(self, node): - if isinstance(node.ctx, ast.Load) and node.id in self.version: - node.id = f"{node.id}_{self.version[node.id]}" - return node - - -tree = ast.parse(source_code) -ssa_tree = SSATransformer().visit(tree) -import astor -print(astor.to_source(ssa_tree)) - -def build_ssa_dependency_graph_with_lineno(tree): - dep_graph = {} - node_labels = {} - for node in ast.walk(tree): - if isinstance(node, ast.Assign) and isinstance(node.targets[0], ast.Name): - lhs = node.targets[0].id - lineno = getattr(node, 'lineno', None) - label = f"{lhs} (line {lineno})" if lineno else lhs - node_labels[lhs] = label - rhs_vars = {n.id for n in ast.walk(node.value) if isinstance(n, ast.Name)} - dep_graph[lhs] = (rhs_vars, lineno) - return dep_graph, node_labels - -def visualize_dep_graph_with_lineno(dep_graph, node_labels): - from graphviz import Digraph - g = Digraph(comment="SSA Variable Dependency Graph with Line Numbers") - for var, label in node_labels.items(): - g.node(var, label) - for var, (deps, _) in dep_graph.items(): - for dep in deps: - if dep in node_labels: - g.edge(dep, var) - g.render('ssa_variable_dependency_graph', view=True) - g.render('output/ssa_variable_dependency_graph', view=True) -# Example usage: -tree = ast.parse(source_code) -ssa_tree = SSATransformer().visit(tree) -dep_graph, node_labels = build_ssa_dependency_graph_with_lineno(ssa_tree) -visualize_dep_graph_with_lineno(dep_graph, node_labels) \ No newline at end of file + return pd.DataFrame(statement_ports, columns=["Line", "Statement", "Port Assignment"]) diff --git a/core/python_compiling_service/src/udf_compiling_service.py b/core/python_compiling_service/src/udf_compiling_service.py index 14b51473af..b65e35a48c 100644 --- a/core/python_compiling_service/src/udf_compiling_service.py +++ b/core/python_compiling_service/src/udf_compiling_service.py @@ -15,7 +15,12 @@ import os # Add the src directory to the path so we can import our modules sys.path.append(os.path.dirname(os.path.abspath(__file__))) -from compiler import compile_udf, infer_line_number_from_code +from compiler import ( + BASELINE_REFERENCE_UDF, + RECOMMENDED_AUTO_CUT_UDF, + compile_udf, + infer_line_number_from_code, +) app = Flask(__name__) CORS(app) # Enable CORS for all routes @@ -107,60 +112,24 @@ def compile_code_get(): @app.route('/example', methods=['GET']) def get_example(): """Get an example of the expected request format.""" - - # Example 1: Normal compilation with specific line cut - example_code_with_cut = '''#5 -import pandas as pd - -def enrich_and_score(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame: - # Step 1: Filter df1 - df1_filtered = df1[df1['activity'] != 'idle'] - - # Step 2: Merge df1 with df2 on user_id - merged = pd.merge(df1_filtered, df2, on='user_id', how='inner') - - # Step 3: Define a simple activity -> value mapping - activity_points = { - 'login': 1, - 'logout': 0.5, - 'purchase': 5, - 'comment': 2 - } - - # Step 4: Compute score = activity_value * group weight - merged['activity_value'] = merged['activity'].map(activity_points).fillna(0) - merged['score'] = merged['activity_value'] * merged['weight'] - - return merged[['user_id', 'activity', 'group', 'score']]''' - - # Example 2: Baseline compilation - example_code_baseline = '''#baseline -import pandas as pd -def simple_function(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame: - merged = pd.merge(df1, df2, on='user_id', how='inner') - return merged[['user_id', 'activity', 'score']]''' - - # Example 3: Auto compilation (no line number) - example_code_auto = '''import pandas as pd + # Stable examples live under compiler/use_cases.py so docs, API, and tests stay in sync. + example_code_with_cut = f"#5\n{RECOMMENDED_AUTO_CUT_UDF}" + example_code_baseline = BASELINE_REFERENCE_UDF + example_code_auto = RECOMMENDED_AUTO_CUT_UDF -def auto_compile(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame: - df1_filtered = df1[df1['activity'] != 'idle'] - merged = pd.merge(df1_filtered, df2, on='user_id', how='inner') - return merged[['user_id', 'activity', 'score']]''' - examples = { 'example_with_line_cut': { 'code': example_code_with_cut, - 'description': 'Compilation with specific line cut. The first line "#5" specifies to cut at line 5.' + 'description': 'Compile with an explicit cut on line 5 to force process-table split position.' }, 'example_baseline': { 'code': example_code_baseline, - 'description': 'Baseline compilation. The first line "#baseline" creates a single process_tables method.' + 'description': 'Baseline mode. "#baseline" forces a single process_tables method.' }, 'example_auto': { 'code': example_code_auto, - 'description': 'Auto compilation. No line number specified, will use optimal cuts based on dependency analysis.' + 'description': 'Recommended default mode. No line prefix; compiler picks the cut automatically.' } } diff --git a/core/python_compiling_service/tests/test_good_use_case.py b/core/python_compiling_service/tests/test_good_use_case.py new file mode 100644 index 0000000000..0ffae1ad4b --- /dev/null +++ b/core/python_compiling_service/tests/test_good_use_case.py @@ -0,0 +1,30 @@ +import os +import sys + +# Add the project root to the Python path +project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, project_root) + +from src.compiler import RECOMMENDED_AUTO_CUT_UDF, compile_udf + + +def test_recommended_use_case_auto_cut_compile_shape(): + result = compile_udf(RECOMMENDED_AUTO_CUT_UDF) + + assert result.num_args == 2 + assert result.baseline_mode is False + + assert result.ranked_cuts, "Expected at least one ranked cut for the recommended use case" + assert result.cuts_used, "Expected at least one selected cut for the recommended use case" + + ranked_lines = {cut["line_number"] for cut in result.ranked_cuts} + selected_line = result.cuts_used[0]["line_number"] + assert selected_line in ranked_lines + + assert "class Operator(UDFGeneralOperator):" in result.operator_class + assert "def process_table_0(" in result.operator_class + assert "def process_table_1(" in result.operator_class + assert "yield {'result': self.result}" in result.operator_class + + assert "process_table_0" in result.process_tables + assert "process_table_1" in result.process_tables diff --git a/core/python_compiling_service/tests/test_service_api.py b/core/python_compiling_service/tests/test_service_api.py index fdfa11948b..7b3069a71b 100644 --- a/core/python_compiling_service/tests/test_service_api.py +++ b/core/python_compiling_service/tests/test_service_api.py @@ -7,6 +7,7 @@ project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, project_root) import src.udf_compiling_service as service +from src.compiler import BASELINE_REFERENCE_UDF, RECOMMENDED_AUTO_CUT_UDF def test_post_compile_success(monkeypatch): @@ -72,3 +73,13 @@ def test_get_compile_internal_error(monkeypatch): resp = client.get("/compile", query_string={"code": "a=1"}) assert resp.status_code == 500 assert "Internal server error: boom" in resp.get_data(as_text=True) + + +def test_example_endpoint_uses_canonical_use_cases(): + client = service.app.test_client() + resp = client.get("/example") + + assert resp.status_code == 200 + payload = resp.get_json() + assert payload["examples"]["example_auto"]["code"] == RECOMMENDED_AUTO_CUT_UDF + assert payload["examples"]["example_baseline"]["code"] == BASELINE_REFERENCE_UDF
