This is an automated email from the ASF dual-hosted git repository.
bobbai00 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 98ba87eeb7 feat(agent-service): enable Texera Agent to do workflow
editing and execution (#4540)
98ba87eeb7 is described below
commit 98ba87eeb7848edbe94abcdcce930b5ed9f7aa27
Author: Jiadong Bai <[email protected]>
AuthorDate: Mon Apr 27 22:38:54 2026 -0700
feat(agent-service): enable Texera Agent to do workflow editing and
execution (#4540)
<!--
Thanks for sending a pull request (PR)! Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
[Contributing to
Texera](https://github.com/apache/texera/blob/main/CONTRIBUTING.md)
2. Ensure you have added or run the appropriate tests for your PR
3. If the PR is work in progress, mark it a draft on GitHub.
4. Please write your PR title to summarize what this PR proposes, we
are following Conventional Commits style for PR titles as well.
5. Be sure to keep the PR description updated to reflect all changes.
-->
### What changes were proposed in this PR?
<!--
Please clarify what changes you are proposing. The purpose of this
section
is to outline the changes. Here are some tips for you:
1. If you propose a new API, clarify the use case for a new API.
2. If you fix a bug, you can clarify why it is a bug.
3. If it is a refactoring, clarify what has been changed.
3. It would be helpful to include a before-and-after comparison using
screenshots or GIFs.
4. Please consider writing useful notes for better and faster reviews.
-->
This PR adds the tools for the Texera Agent to do CRUD operations on
workflow, and execute the workflow to see the execution result.
### Any related issues, documentation, discussions?
<!--
Please use this section to link other resources if not mentioned
already.
1. If this PR fixes an issue, please include `Fixes #1234`, `Resolves
#1234`
or `Closes #1234`. If it is only related, simply mention the issue
number.
2. If there is design documentation, please add the link.
3. If there is a discussion in the mailing list, please add the link.
-->
Closes #4493
### How was this PR tested?
<!--
If tests were added, say they were added here. Or simply mention that if
the PR
is tested with existing test cases. Make sure to include/update test
cases that
check the changes thoroughly including negative and positive cases if
possible.
If it was tested in a way different from regular unit tests, please
clarify how
you tested step by step, ideally copy and paste-able, so that other
reviewers can
test and check, and descendants can verify in the future. If tests were
not added,
please describe why they were not added and/or why it was difficult to
add.
-->
Manually tested
### Was this PR authored or co-authored using generative AI tooling?
<!--
If generative AI tooling has been used in the process of authoring this
PR,
please include the phrase: 'Generated-by: ' followed by the name of the
tool
and its version. If no, write 'No'.
Please refer to the [ASF Generative Tooling
Guidance](https://www.apache.org/legal/generative-tooling.html) for
details.
-->
No
---------
Co-authored-by: Chen Li <[email protected]>
Co-authored-by: Xinyuan Lin <[email protected]>
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
agent-service/src/agent/texera-agent.ts | 110 ++-
agent-service/src/agent/tools/index.ts | 2 +
agent-service/src/agent/tools/result-formatting.ts | 138 ++++
.../src/agent/tools/workflow-crud-tools.ts | 346 ++++++++
.../src/agent/tools/workflow-execution-tools.ts | 604 ++++++++++++++
agent-service/src/agent/util/auto-layout.ts | 70 ++
.../src/agent/workflow-result-state.test.ts | 95 +++
agent-service/src/agent/workflow-result-state.ts | 83 ++
.../src/{types/index.ts => api/execution-api.ts} | 21 +-
agent-service/src/api/index.ts | 1 +
agent-service/src/index.ts | 1 +
agent-service/src/server.ts | 35 +-
agent-service/src/types/agent.ts | 2 +
agent-service/src/types/{index.ts => execution.ts} | 36 +-
agent-service/src/types/index.ts | 1 +
.../apache/texera/web/ComputingUnitMaster.scala | 7 +-
.../web/resource/SyncExecutionResource.scala | 900 +++++++++++++++++++++
17 files changed, 2438 insertions(+), 14 deletions(-)
diff --git a/agent-service/src/agent/texera-agent.ts
b/agent-service/src/agent/texera-agent.ts
index 24e7a5bfba..37eb12d868 100644
--- a/agent-service/src/agent/texera-agent.ts
+++ b/agent-service/src/agent/texera-agent.ts
@@ -22,6 +22,8 @@ import { Subscription } from "rxjs";
import { debounceTime } from "rxjs/operators";
import { WorkflowState } from "./workflow-state";
import { WorkflowSystemMetadata } from "./util/workflow-system-metadata";
+import { WorkflowResultState } from "./workflow-result-state";
+import { formatOperatorResult } from "./tools/result-formatting";
import type { AgentSettings, ReActStep, TokenUsage, UserInfo } from
"../types/agent";
import {
AgentState as AgentStateEnum,
@@ -30,6 +32,21 @@ import {
INITIAL_STEP_ID,
} from "../types/agent";
import { buildSystemPrompt } from "./prompts";
+import {
+ createAddOperatorTool,
+ createModifyOperatorTool,
+ createDeleteOperatorTool,
+ TOOL_NAME_ADD_OPERATOR,
+ TOOL_NAME_MODIFY_OPERATOR,
+ TOOL_NAME_DELETE_OPERATOR,
+ type ToolContext,
+} from "./tools/workflow-crud-tools";
+import {
+ createExecuteOperatorTool,
+ executeOperatorAndFormat,
+ TOOL_NAME_EXECUTE_OPERATOR,
+ type ExecutionConfig,
+} from "./tools/workflow-execution-tools";
import { assembleContext } from "./util/context-utils";
import { compileWorkflowAsync, type WorkflowCompilationResponse } from
"../api/compile-api";
import { createLogger } from "../logger";
@@ -60,7 +77,7 @@ type ReActStepCallback = (step: ReActStep) => void;
*
* Owns the conversation (ReAct step tree with HEAD/checkout semantics), the
* workflow being edited (`WorkflowState`), cached operator execution results
- * and the tool surface exposed to the LLM. Each call
+ * (`WorkflowResultState`), and the tool surface exposed to the LLM. Each call
* to `sendMessage` drives one multi-step generation via the Vercel AI SDK,
* streaming step updates to subscribed websockets.
*/
@@ -76,6 +93,7 @@ export class TexeraAgent {
private head: string = INITIAL_STEP_ID;
private stepsById: Map<string, ReActStep> = new Map();
private stepCounter = 0;
+ private workflowResultState: WorkflowResultState;
private websockets: Set<any> = new Set();
@@ -118,6 +136,7 @@ export class TexeraAgent {
this.workflowState = new WorkflowState();
this.metadataStore = WorkflowSystemMetadata.getInstance();
+ this.workflowResultState = new WorkflowResultState(() =>
this.getAncestorPath());
const initialStep: ReActStep = {
id: INITIAL_STEP_ID,
@@ -160,8 +179,56 @@ export class TexeraAgent {
this.settings.systemPrompt = this.systemPrompt;
}
+ private buildExecutionConfig(): ExecutionConfig | undefined {
+ if (!this.delegateConfig) return undefined;
+ return {
+ userToken: this.delegateConfig.userToken,
+ workflowId: this.delegateConfig.workflowId,
+ computingUnitId: this.delegateConfig.computingUnitId,
+ maxOperatorResultCharLimit: this.settings.maxOperatorResultCharLimit,
+ maxOperatorResultCellCharLimit:
this.settings.maxOperatorResultCellCharLimit,
+ executionTimeoutMs: this.settings.executionTimeoutMs,
+ };
+ }
+
private createTools(): Record<string, any> {
- return {};
+ const operatorSchemas = new Map<string, any>();
+ for (const type of Object.keys(this.metadataStore.getAllOperatorTypes())) {
+ const jsonSchema = this.metadataStore.getSchema(type);
+ const additionalMetadata =
this.metadataStore.getAdditionalMetadata(type);
+ if (jsonSchema) {
+ operatorSchemas.set(type, { jsonSchema, additionalMetadata });
+ }
+ }
+
+ const getExecutionConfig = this.delegateConfig ? () =>
this.buildExecutionConfig()! : undefined;
+
+ const context: ToolContext = {
+ metadataStore: this.metadataStore,
+ settings: {
+ maxOperatorResultCharLimit: this.settings.maxOperatorResultCharLimit,
+ toolTimeoutMs: this.settings.toolTimeoutMs,
+ executionTimeoutMs: this.settings.executionTimeoutMs,
+ },
+ };
+
+ const tools: Record<string, any> = {
+ [TOOL_NAME_DELETE_OPERATOR]:
createDeleteOperatorTool(this.workflowState, context),
+ [TOOL_NAME_ADD_OPERATOR]: createAddOperatorTool(this.workflowState,
operatorSchemas, context),
+ [TOOL_NAME_MODIFY_OPERATOR]:
createModifyOperatorTool(this.workflowState, context),
+ };
+
+ if (getExecutionConfig) {
+ tools[TOOL_NAME_EXECUTE_OPERATOR] = createExecuteOperatorTool(
+ this.workflowState,
+ getExecutionConfig,
+ (opId, operatorInfo) => {
+ this.workflowResultState.set(opId, this.head, operatorInfo);
+ }
+ );
+ }
+
+ return tools;
}
getState(): AgentStateEnum {
@@ -195,6 +262,10 @@ export class TexeraAgent {
return this.stepsById;
}
+ getWorkflowResultState(): WorkflowResultState {
+ return this.workflowResultState;
+ }
+
getWebsockets(): Set<any> {
return this.websockets;
}
@@ -541,6 +612,34 @@ export class TexeraAgent {
this.addStep(agentStep);
this.head = agentStepId;
+ const execConfig = this.buildExecutionConfig();
+ if (execConfig && toolCalls && toolResults) {
+ const EXECUTE_AFTER_TOOLS = new Set([TOOL_NAME_ADD_OPERATOR,
TOOL_NAME_MODIFY_OPERATOR]);
+
+ for (let i = 0; i < toolCalls.length; i++) {
+ const tc = toolCalls[i];
+ const tr = toolResults[i];
+ if (!EXECUTE_AFTER_TOOLS.has(tc.toolName)) continue;
+
+ const resultText = typeof tr?.output === "string" ? tr.output :
String(tr?.output ?? "");
+ if (resultText.startsWith("[ERROR]")) continue;
+
+ const operatorId = (tc.input as any)?.operatorId;
+ if (!operatorId) continue;
+
+ try {
+ await executeOperatorAndFormat(this.workflowState, execConfig,
operatorId, {
+ abortSignal: this.abortController?.signal,
+ onResult: (opId, operatorInfo) => {
+ this.workflowResultState.set(opId, this.head,
operatorInfo);
+ },
+ });
+ } catch (e: any) {
+ this.log.warn({ operatorId, err: e?.message || e }, "post-step
execution failed");
+ }
+ }
+ }
+
beforeStepContent = afterStepContent;
isFirstStep = false;
},
@@ -626,7 +725,12 @@ export class TexeraAgent {
}
private getFormattedResultsForDAG(): Map<string, string> {
- return new Map();
+ const result = new Map<string, string>();
+ const visible = this.workflowResultState.getAllVisible();
+ for (const [operatorId, entry] of visible) {
+ result.set(operatorId, formatOperatorResult(operatorId,
entry.operatorInfo, this.workflowState));
+ }
+ return result;
}
stop(): void {
diff --git a/agent-service/src/agent/tools/index.ts
b/agent-service/src/agent/tools/index.ts
index 84173d931d..7e2d957070 100644
--- a/agent-service/src/agent/tools/index.ts
+++ b/agent-service/src/agent/tools/index.ts
@@ -18,3 +18,5 @@
*/
export * from "./tools-utility";
+export * from "./workflow-crud-tools";
+export * from "./workflow-execution-tools";
diff --git a/agent-service/src/agent/tools/result-formatting.ts
b/agent-service/src/agent/tools/result-formatting.ts
new file mode 100644
index 0000000000..9a11ba5085
--- /dev/null
+++ b/agent-service/src/agent/tools/result-formatting.ts
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import type { OperatorInfo } from "../../types/execution";
+import type { WorkflowState } from "../workflow-state";
+import { formatExecuteOperatorResult } from "./tools-utility";
+
+export function formatOperatorResult(operatorId: string, opInfo: OperatorInfo,
workflowState: WorkflowState): string {
+ if (opInfo.error) {
+ return `[ERROR] ${opInfo.error}`;
+ }
+
+ if (!opInfo.result || !Array.isArray(opInfo.result)) {
+ return "(no result data)";
+ }
+
+ const jsonArray = opInfo.result as Record<string, any>[];
+ const headers =
+ jsonArray.length > 0
+ ? Object.keys(jsonArray[0]).filter(k => k !== "__row_index__" && k !==
"__is_visualization__")
+ : [];
+ const columns = headers.length;
+
+ const isViz = jsonArray.length > 0 && jsonArray[0]["__is_visualization__"]
=== true;
+ const serializableArray = isViz
+ ? jsonArray.map(row => {
+ const cleaned: Record<string, any> = {};
+ for (const key of Object.keys(row)) {
+ if (key === "__is_visualization__") continue;
+ if (key === "html-content" || key === "json-content") {
+ cleaned[key] = "<skipped: visualization content>";
+ } else {
+ cleaned[key] = row[key];
+ }
+ }
+ return cleaned;
+ })
+ : jsonArray;
+
+ const dataString = jsonToTableFormat(serializableArray);
+
+ const metadataLines = [
+ formatInputOutputMetadata(workflowState, operatorId, opInfo, columns),
+ ...(opInfo.warnings ?? []),
+ ].filter(Boolean);
+
+ const briefSummary = formatExecuteOperatorResult(operatorId);
+ return [briefSummary, ...metadataLines,
dataString].filter(Boolean).join("\n");
+}
+
+function formatInputOutputMetadata(
+ workflowState: WorkflowState,
+ operatorId: string,
+ opInfo: OperatorInfo,
+ outputColumns: number
+): string {
+ const outputRows = opInfo.totalRowCount ?? opInfo.outputTuples;
+ const outputLine = `Output table shape: (${outputRows}, ${outputColumns})`;
+
+ const inputShapes = opInfo.inputPortShapes;
+ if (!inputShapes || inputShapes.length === 0) {
+ return outputLine;
+ }
+
+ const inputLinks = workflowState.getAllLinks().filter(l =>
l.target.operatorID === operatorId);
+ const portIndexToUpstream = new Map<number, string>();
+ const op = workflowState.getOperator(operatorId);
+ for (const link of inputLinks) {
+ const portIdx = op?.inputPorts.findIndex(p => p.portID ===
link.target.portID) ?? -1;
+ if (portIdx >= 0) {
+ portIndexToUpstream.set(portIdx, link.source.operatorID);
+ }
+ }
+
+ const inputPart = inputShapes
+ .sort((a, b) => a.portIndex - b.portIndex)
+ .map(p => {
+ const name = portIndexToUpstream.get(p.portIndex) ??
`input${p.portIndex}`;
+ return `${name}(${p.rows}, ${p.columns})`;
+ })
+ .join(", ");
+
+ return `Input operator(table shape): ${inputPart}\n${outputLine}`;
+}
+
+function jsonToTableFormat(jsonResult: Record<string, any>[]): string {
+ if (!jsonResult || jsonResult.length === 0) return "";
+
+ const hasRowIndex = "__row_index__" in jsonResult[0];
+ const headers = Object.keys(jsonResult[0]).filter(h => h !==
"__row_index__");
+ if (headers.length === 0) return "";
+
+ const headerLine = "\t" + headers.join("\t");
+ const formattedRows: string[] = [];
+ let prevIndex = -1;
+
+ for (let i = 0; i < jsonResult.length; i++) {
+ const row = jsonResult[i];
+ const rowIndex = hasRowIndex ? (row["__row_index__"] as number) : i;
+
+ if (prevIndex >= 0 && rowIndex > prevIndex + 1) {
+ const dots = headers.map(() => "...").join("\t");
+ formattedRows.push(`...\t${dots}`);
+ }
+ prevIndex = rowIndex;
+
+ const cells = headers.map(h => {
+ const val = row[h];
+ if (val === null) return "NaN";
+ if (val === undefined) return "";
+ if (typeof val === "number" || typeof val === "boolean") return
String(val);
+ if (typeof val === "string") {
+ if (val === "NULL") return "NaN";
+ return val.replace(/\t/g, "\\t").replace(/\n/g, "\\n");
+ }
+ return JSON.stringify(val);
+ });
+ formattedRows.push(`${rowIndex}\t${cells.join("\t")}`);
+ }
+
+ return [headerLine, ...formattedRows].join("\n");
+}
diff --git a/agent-service/src/agent/tools/workflow-crud-tools.ts
b/agent-service/src/agent/tools/workflow-crud-tools.ts
new file mode 100644
index 0000000000..54a0b29db9
--- /dev/null
+++ b/agent-service/src/agent/tools/workflow-crud-tools.ts
@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { z } from "zod";
+import { tool } from "ai";
+import { WorkflowState } from "../workflow-state";
+import { autoLayoutWorkflow } from "../util/auto-layout";
+import { WorkflowUtilService } from "../util/workflow-utils";
+import type { OperatorLink } from "../../types/workflow";
+import {
+ createToolResult,
+ createErrorResult,
+ formatAddOperatorResult,
+ formatModifyOperatorResult,
+ formatOperatorError,
+} from "./tools-utility";
+import {
+ type WorkflowSystemMetadata,
+ formatValidationErrors,
+ formatCompactSchemaForError,
+} from "../util/workflow-system-metadata";
+
+export interface ToolContext {
+ metadataStore?: WorkflowSystemMetadata;
+ settings?: {
+ maxOperatorResultCharLimit?: number;
+ toolTimeoutMs?: number;
+ executionTimeoutMs?: number;
+ };
+}
+
+export const TOOL_NAME_ADD_OPERATOR = "addOperator";
+export const TOOL_NAME_MODIFY_OPERATOR = "modifyOperator";
+export const TOOL_NAME_DELETE_OPERATOR = "deleteOperator";
+
+function formatInputArgs(args: Record<string, any>): string {
+ const compact: Record<string, any> = {};
+ for (const [key, value] of Object.entries(args)) {
+ if (value !== undefined) compact[key] = value;
+ }
+ return `Input: ${JSON.stringify(compact)}`;
+}
+
+export function createAddOperatorTool(
+ workflowState: WorkflowState,
+ operatorSchemas: Map<string, any>,
+ context?: ToolContext
+) {
+ const workflowUtil = context?.metadataStore ? new
WorkflowUtilService(context.metadataStore, workflowState) : null;
+
+ return tool({
+ description: `Add a new operator to the workflow. Use getOperatorSchema
first to understand required properties.
+
+Examples:
+1. Add a source operator (no inputs):
+ { "operatorId": "op1", "operatorType": "TableFileScan", "properties": {
"fileName": "data.csv" }, "summary": "Load CSV data" }
+
+2. Add an operator with input connections:
+ { "operatorId": "op2", "operatorType": "TableFilter", "properties": {
"predicates": [...] }, "inputOperatorIds": { "0": ["op1"] }, "summary": "Filter
rows by condition" }`,
+ inputSchema: z.object({
+ operatorId: z
+ .string()
+ .describe(
+ "Name of Operator. Use the format 'op' followed by an incrementing
number starting from 1 (e.g., op1, op2, op3)."
+ ),
+ operatorType: z.string().describe("The operator type (e.g.,
'DataProcessing', 'Aggregate')"),
+ properties: z.record(z.any()).describe("Properties to set on the
operator"),
+ inputOperatorIds: z
+ .record(z.array(z.string()))
+ .optional()
+ .describe(
+ "Mapping from input port index to an ordered list of source operator
IDs that connect to that port. " +
+ 'E.g. {"0": ["opA", "opB"], "1": ["opC"]} connects opA and opB to
input port 0, opC to input port 1. ' +
+ "Source operators that load files (e.g. CSVFileScan) should NOT
have any input operators."
+ ),
+ summary: z.string().describe("Very brief summary of operator behavior.
Within 5 words"),
+ }),
+ execute: async (args: {
+ operatorId: string;
+ operatorType: string;
+ properties?: Record<string, any>;
+ inputOperatorIds?: Record<string, string[]>;
+ summary: string;
+ }) => {
+ try {
+ const inputInfo = formatInputArgs(args);
+
+ const schemaEntry = operatorSchemas.get(args.operatorType);
+ if (!schemaEntry) {
+ return createErrorResult(
+ `Unknown operator type: "${args.operatorType}". Available types:
${[...operatorSchemas.keys()].join(", ")}. ${inputInfo}`
+ );
+ }
+
+ if (context?.metadataStore && args.properties) {
+ const validation =
context.metadataStore.validateOperatorProperties(args.operatorType,
args.properties);
+ if (!validation.isValid) {
+ const compactSchema =
context.metadataStore.getCompactSchema(args.operatorType);
+ const schemaStr = compactSchema ? ` Expected:
${formatCompactSchemaForError(compactSchema)}.` : "";
+ return createErrorResult(
+ `Invalid properties for "${args.operatorType}":
${formatValidationErrors(validation)}.${schemaStr} ${inputInfo}`
+ );
+ }
+ }
+
+ if (!workflowUtil) {
+ return createErrorResult(`Metadata store not available for operator
creation. ${inputInfo}`);
+ }
+
+ if (!/^op\d+$/.test(args.operatorId)) {
+ return createErrorResult(
+ `Invalid operatorId: "${args.operatorId}". Must follow the format
"op" followed by a number (e.g., op1, op2, op3). ${inputInfo}`
+ );
+ }
+
+ const existing = workflowState.getOperator(args.operatorId);
+ if (existing) {
+ return createErrorResult(
+ `Operator with ID "${args.operatorId}" already exists. Use
modifyOperator to update it, or choose a different ID. ${inputInfo}`
+ );
+ }
+
+ let operator = workflowUtil.getNewOperatorPredicate(args.operatorType,
args.summary);
+ operator = {
+ ...operator,
+ operatorID: args.operatorId,
+ operatorProperties: { ...operator.operatorProperties,
...args.properties },
+ };
+
+ workflowState.addOperator(operator);
+
+ const createdLinkPairs: { source: string; target: string }[] = [];
+ if (args.inputOperatorIds) {
+ const addedOperator =
workflowState.getOperator(operator.operatorID)!;
+ for (const [portIndexStr, sourceOpIds] of
Object.entries(args.inputOperatorIds)) {
+ const targetPortIdx = parseInt(portIndexStr, 10);
+ if (isNaN(targetPortIdx) || targetPortIdx < 0) {
+ return createErrorResult(
+ `Invalid input port index: "${portIndexStr}". Must be a
non-negative integer. ${inputInfo}`
+ );
+ }
+ if (targetPortIdx >= addedOperator.inputPorts.length) {
+ return createErrorResult(
+ `Input port index ${targetPortIdx} out of range. Operator
"${args.operatorId}" has ${addedOperator.inputPorts.length} input port(s).
${inputInfo}`
+ );
+ }
+ const targetPortId =
addedOperator.inputPorts[targetPortIdx].portID;
+
+ for (const sourceOpId of sourceOpIds) {
+ const sourceOp = workflowState.getOperator(sourceOpId);
+ if (!sourceOp) {
+ return createErrorResult(
+ `Source operator "${sourceOpId}" not found. Make sure it
exists before referencing it in inputOperatorIds. ${inputInfo}`
+ );
+ }
+ const sourcePortId = sourceOp.outputPorts.length > 0 ?
sourceOp.outputPorts[0].portID : "output-0";
+
+ const linkId = workflowState.generateLinkId();
+ const link: OperatorLink = {
+ linkID: linkId,
+ source: { operatorID: sourceOpId, portID: sourcePortId },
+ target: { operatorID: args.operatorId, portID: targetPortId },
+ };
+ workflowState.addLink(link);
+ createdLinkPairs.push({ source: sourceOpId, target:
args.operatorId });
+ }
+ }
+ }
+
+ autoLayoutWorkflow(workflowState);
+
+ const finalOperator = workflowState.getOperator(operator.operatorID)
|| operator;
+ const numInputPorts = finalOperator.inputPorts.length;
+ const numOutputPorts = finalOperator.outputPorts.length;
+
+ let resultMsg = formatAddOperatorResult(
+ operator.operatorID,
+ numInputPorts,
+ numOutputPorts,
+ createdLinkPairs.length > 0 ? createdLinkPairs : undefined
+ );
+
+ return createToolResult(resultMsg);
+ } catch (error: any) {
+ return createErrorResult(error.message || String(error));
+ }
+ },
+ });
+}
+
+export function createModifyOperatorTool(workflowState: WorkflowState,
context?: ToolContext) {
+ return tool({
+ description: `Modify an existing operator's properties, input links, or
both.
+
+Examples:
+1. Modify properties only:
+ { "operatorId": "agg", "properties": { "groupByKeys": ["city"] },
"summary": "Group by city" }
+
+2. Modify input links only (replaces all existing incoming links):
+ { "operatorId": "join_op", "inputOperatorIds": { "0": ["users"], "1":
["orders"] }, "summary": "Re-link join inputs" }
+
+3. Modify both properties and links:
+ { "operatorId": "filter", "properties": { "predicates": [...] },
"inputOperatorIds": { "0": ["cleaned"] }, "summary": "Update filter and
re-link" }`,
+ inputSchema: z.object({
+ operatorId: z.string().describe("ID of the operator to modify"),
+ properties: z.record(z.any()).optional().describe("Properties to update
(merged with existing)"),
+ inputOperatorIds: z
+ .record(z.array(z.string()))
+ .optional()
+ .describe(
+ "Mapping from input port index to an ordered list of source operator
IDs. " +
+ "If provided, all existing incoming links are deleted and replaced
with these. " +
+ 'E.g. {"0": ["opA", "opB"], "1": ["opC"]} connects opA and opB to
input port 0, opC to input port 1.'
+ ),
+ summary: z.string().describe("Very brief summary of operator behavior
after your modification. Within 5 words"),
+ }),
+ execute: async (args: {
+ operatorId: string;
+ properties?: Record<string, any>;
+ inputOperatorIds?: Record<string, string[]>;
+ summary?: string;
+ }) => {
+ try {
+ const inputInfo = formatInputArgs(args);
+
+ const operator = workflowState.getOperator(args.operatorId);
+ if (!operator) return createErrorResult(`Operator ${args.operatorId}
not found. ${inputInfo}`);
+
+ if (args.properties && context?.metadataStore) {
+ const mergedProperties = { ...operator.operatorProperties,
...args.properties };
+ const validation =
context.metadataStore.validateOperatorProperties(operator.operatorType,
mergedProperties);
+ if (!validation.isValid) {
+ const compactSchema =
context.metadataStore.getCompactSchema(operator.operatorType);
+ const schemaStr = compactSchema ? ` Expected:
${formatCompactSchemaForError(compactSchema)}.` : "";
+ return createErrorResult(
+ `Invalid properties for "${operator.operatorType}":
${formatValidationErrors(validation)}.${schemaStr} ${inputInfo}`
+ );
+ }
+ }
+
+ const createdLinkPairs: { source: string; target: string }[] = [];
+ const deletedLinkPairs: { source: string; target: string }[] = [];
+
+ if (args.properties) {
+ workflowState.updateOperatorProperties(args.operatorId,
args.properties);
+ }
+
+ if (args.summary) {
+ workflowState.updateOperatorDisplayName(args.operatorId,
args.summary);
+ }
+
+ if (args.inputOperatorIds) {
+ const currentLinks = workflowState
+ .getLinksConnectedToOperator(args.operatorId)
+ .filter(link => link.target.operatorID === args.operatorId);
+ for (const link of currentLinks) {
+ deletedLinkPairs.push({ source: link.source.operatorID, target:
link.target.operatorID });
+ workflowState.deleteLink(link.linkID);
+ }
+
+ for (const [portIndexStr, sourceOpIds] of
Object.entries(args.inputOperatorIds)) {
+ const targetPortIdx = parseInt(portIndexStr, 10);
+ if (isNaN(targetPortIdx) || targetPortIdx < 0) {
+ return createErrorResult(
+ `Invalid input port index: "${portIndexStr}". Must be a
non-negative integer. ${inputInfo}`
+ );
+ }
+ if (targetPortIdx >= operator.inputPorts.length) {
+ return createErrorResult(
+ `Input port index ${targetPortIdx} out of range. Operator
"${args.operatorId}" has ${operator.inputPorts.length} input port(s).
${inputInfo}`
+ );
+ }
+ const targetPortId = operator.inputPorts[targetPortIdx].portID;
+
+ for (const sourceOpId of sourceOpIds) {
+ const sourceOp = workflowState.getOperator(sourceOpId);
+ if (!sourceOp) {
+ return createErrorResult(
+ `Source operator "${sourceOpId}" not found. Make sure it
exists before referencing it in inputOperatorIds. ${inputInfo}`
+ );
+ }
+ const sourcePortId = sourceOp.outputPorts.length > 0 ?
sourceOp.outputPorts[0].portID : "output-0";
+
+ const linkId = workflowState.generateLinkId();
+ const link: OperatorLink = {
+ linkID: linkId,
+ source: { operatorID: sourceOpId, portID: sourcePortId },
+ target: { operatorID: args.operatorId, portID: targetPortId },
+ };
+ workflowState.addLink(link);
+ createdLinkPairs.push({ source: sourceOpId, target:
args.operatorId });
+ }
+ }
+
+ autoLayoutWorkflow(workflowState);
+ }
+
+ let resultMsg = formatModifyOperatorResult(
+ args.operatorId,
+ createdLinkPairs.length > 0 ? createdLinkPairs : undefined,
+ deletedLinkPairs.length > 0 ? deletedLinkPairs : undefined
+ );
+
+ return createToolResult(resultMsg);
+ } catch (error: any) {
+ return createErrorResult(formatOperatorError(args.operatorId,
error.message || String(error)));
+ }
+ },
+ });
+}
+
+export function createDeleteOperatorTool(workflowState: WorkflowState,
_context?: ToolContext) {
+ return tool({
+ description: "Delete an operator from the workflow. This also deletes all
connected links.",
+ inputSchema: z.object({
+ operatorId: z.string().describe("ID of the operator to delete"),
+ }),
+ execute: async (args: { operatorId: string }) => {
+ try {
+ const deleted = workflowState.deleteOperator(args.operatorId);
+ if (!deleted) {
+ return createErrorResult(`Operator ${args.operatorId} not found`);
+ }
+ return createToolResult(`Deleted operator: ${args.operatorId}`);
+ } catch (error: any) {
+ return createErrorResult(error.message || String(error));
+ }
+ },
+ });
+}
diff --git a/agent-service/src/agent/tools/workflow-execution-tools.ts
b/agent-service/src/agent/tools/workflow-execution-tools.ts
new file mode 100644
index 0000000000..15fa81ff97
--- /dev/null
+++ b/agent-service/src/agent/tools/workflow-execution-tools.ts
@@ -0,0 +1,604 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { z } from "zod";
+import { tool } from "ai";
+import { createErrorResult, formatExecuteOperatorResult } from
"./tools-utility";
+import type { WorkflowState } from "../workflow-state";
+import { getBackendConfig } from "../../api/backend-api";
+import { env } from "../../config/env";
+import type { LogicalPlan, LogicalLink } from "../../api/execution-api";
+import type { OperatorInfo, SyncExecutionResult } from "../../types/execution";
+import { WorkflowSystemMetadata } from "../util/workflow-system-metadata";
+import { DEFAULT_AGENT_SETTINGS } from "../../types/agent";
+import { createLogger } from "../../logger";
+
+const log = createLogger("ExecutionTools");
+
+export const TOOL_NAME_EXECUTE_OPERATOR = "executeOperator";
+
+export interface ExecutionConfig {
+ userToken: string;
+ workflowId: number;
+ computingUnitId?: number;
+ maxOperatorResultCharLimit?: number;
+ maxOperatorResultCellCharLimit?: number;
+ executionTimeoutMs?: number;
+}
+
+/**
+ * FIFO async lock used to serialize workflow executions per workflow id.
+ *
+ * `acquire()` resolves with a release function once prior holders have
+ * released. Callers must invoke the release in a `finally` to avoid
+ * deadlocking subsequent waiters.
+ */
+class AsyncMutex {
+ private queue: Promise<void> = Promise.resolve();
+
+ async acquire(): Promise<() => void> {
+ let release: () => void;
+ const currentQueue = this.queue;
+
+ this.queue = new Promise<void>(resolve => {
+ release = resolve;
+ });
+
+ await currentQueue;
+
+ return release!;
+ }
+}
+
+const workflowMutexes = new Map<number, AsyncMutex>();
+
+function getWorkflowMutex(workflowId: number): AsyncMutex {
+ let mutex = workflowMutexes.get(workflowId);
+ if (!mutex) {
+ mutex = new AsyncMutex();
+ workflowMutexes.set(workflowId, mutex);
+ }
+ return mutex;
+}
+
+interface WorkflowValidationResult {
+ isValid: boolean;
+ errors: Record<string, Record<string, string>>;
+}
+
+interface OperatorValidation {
+ isValid: boolean;
+ messages: Record<string, string>;
+}
+
+function validateOperatorSchema(operatorType: string, operatorProperties:
Record<string, any>): OperatorValidation {
+ const metadataStore = WorkflowSystemMetadata.getInstance();
+ const validation = metadataStore.validateOperatorProperties(operatorType,
operatorProperties);
+ return validation.isValid ? { isValid: true, messages: {} } : { isValid:
false, messages: validation.messages };
+}
+
+function validateOperatorConnection(operatorId: string, workflowState:
WorkflowState): OperatorValidation {
+ const operator = workflowState.getOperator(operatorId);
+ if (!operator) {
+ return { isValid: false, messages: { error: `Operator ${operatorId} not
found` } };
+ }
+
+ const numInputLinksByPort = new Map<string, number>();
+ const allLinks = workflowState.getAllLinks();
+
+ for (const link of allLinks) {
+ if (link.target.operatorID === operatorId) {
+ const portID = link.target.portID;
+ numInputLinksByPort.set(portID, (numInputLinksByPort.get(portID) ?? 0) +
1);
+ }
+ }
+
+ let satisfyInput = true;
+ let violationMessage = "";
+
+ for (const port of operator.inputPorts) {
+ const portNumInputs = numInputLinksByPort.get(port.portID) ?? 0;
+
+ if (port.disallowMultiInputs) {
+ if (portNumInputs !== 1) {
+ satisfyInput = false;
+ violationMessage += `${port.displayName ?? port.portID} requires 1
input, has ${portNumInputs}. `;
+ }
+ } else {
+ if (portNumInputs < 1) {
+ satisfyInput = false;
+ violationMessage += `${port.displayName ?? port.portID} requires at
least 1 input, has ${portNumInputs}. `;
+ }
+ }
+ }
+
+ return satisfyInput
+ ? { isValid: true, messages: {} }
+ : { isValid: false, messages: { inputs: violationMessage.trim() } };
+}
+
+function combineValidations(...validations: OperatorValidation[]):
OperatorValidation {
+ let isValid = true;
+ let messages: Record<string, string> = {};
+
+ for (const validation of validations) {
+ if (!validation.isValid) {
+ isValid = false;
+ messages = { ...messages, ...validation.messages };
+ }
+ }
+
+ return { isValid, messages };
+}
+
+function validateWorkflow(workflowState: WorkflowState):
WorkflowValidationResult {
+ const errors: Record<string, Record<string, string>> = {};
+
+ for (const operator of workflowState.getAllEnabledOperators()) {
+ const schemaValidation = validateOperatorSchema(operator.operatorType,
operator.operatorProperties);
+ const connectionValidation =
validateOperatorConnection(operator.operatorID, workflowState);
+ const combined = combineValidations(schemaValidation,
connectionValidation);
+
+ if (!combined.isValid) {
+ errors[operator.operatorID] = combined.messages;
+ }
+ }
+
+ return {
+ isValid: Object.keys(errors).length === 0,
+ errors,
+ };
+}
+
+function formatWorkflowValidationErrors(validationResult:
WorkflowValidationResult): string {
+ if (validationResult.isValid) return "";
+
+ const lines: string[] = ["Workflow validation failed:"];
+ for (const [operatorId, fieldErrors] of
Object.entries(validationResult.errors)) {
+ lines.push(` Operator ${operatorId}:`);
+ for (const [field, message] of Object.entries(fieldErrors)) {
+ lines.push(` - ${field}: ${message}`);
+ }
+ }
+ return lines.join("\n");
+}
+
+function buildLogicalPlan(workflowState: WorkflowState, opsToViewResult?:
string[]): LogicalPlan {
+ const useSubDAG = opsToViewResult && opsToViewResult.length === 1;
+ const targetOperatorId = useSubDAG ? opsToViewResult[0] : undefined;
+
+ let operatorsList: { operatorID: string; operatorType: string; [key:
string]: any }[];
+ let linksList: LogicalLink[];
+
+ const getInputPortOrdinal = (operatorID: string, inputPortID: string):
number => {
+ const op = workflowState.getOperator(operatorID);
+ if (!op) return 0;
+ const idx = op.inputPorts.findIndex(port => port.portID === inputPortID);
+ return idx >= 0 ? idx : 0;
+ };
+
+ const getOutputPortOrdinal = (operatorID: string, outputPortID: string):
number => {
+ const op = workflowState.getOperator(operatorID);
+ if (!op) return 0;
+ const idx = op.outputPorts.findIndex(port => port.portID === outputPortID);
+ return idx >= 0 ? idx : 0;
+ };
+
+ if (targetOperatorId) {
+ const subDAG = workflowState.getSubDAG(targetOperatorId);
+
+ operatorsList = subDAG.operators.map(op => ({
+ ...op.operatorProperties,
+ operatorID: op.operatorID,
+ operatorType: op.operatorType,
+ inputPorts: op.inputPorts,
+ outputPorts: op.outputPorts,
+ }));
+
+ linksList = subDAG.links.map(link => ({
+ fromOpId: link.source.operatorID,
+ fromPortId: { id: getOutputPortOrdinal(link.source.operatorID,
link.source.portID), internal: false },
+ toOpId: link.target.operatorID,
+ toPortId: { id: getInputPortOrdinal(link.target.operatorID,
link.target.portID), internal: false },
+ }));
+ } else {
+ operatorsList = workflowState.getAllEnabledOperators().map(op => ({
+ ...op.operatorProperties,
+ operatorID: op.operatorID,
+ operatorType: op.operatorType,
+ inputPorts: op.inputPorts,
+ outputPorts: op.outputPorts,
+ }));
+
+ linksList = workflowState.getAllLinks().map(link => ({
+ fromOpId: link.source.operatorID,
+ fromPortId: { id: getOutputPortOrdinal(link.source.operatorID,
link.source.portID), internal: false },
+ toOpId: link.target.operatorID,
+ toPortId: { id: getInputPortOrdinal(link.target.operatorID,
link.target.portID), internal: false },
+ }));
+ }
+
+ let allOpsToView: string[];
+ if (opsToViewResult && opsToViewResult.length > 0) {
+ const operatorIds = new Set(operatorsList.map(op => op.operatorID));
+ allOpsToView = opsToViewResult.filter(id => operatorIds.has(id));
+ } else {
+ allOpsToView = operatorsList
+ .filter(op => !linksList.some(link => link.fromOpId === op.operatorID))
+ .map(op => op.operatorID);
+ }
+
+ return {
+ operators: operatorsList,
+ links: linksList,
+ opsToViewResult: allOpsToView,
+ };
+}
+
+async function executeWorkflowHttp(
+ config: ExecutionConfig,
+ logicalPlan: LogicalPlan,
+ options: { abortSignal?: AbortSignal } = {}
+): Promise<SyncExecutionResult> {
+ const backendConfig = getBackendConfig();
+
+ const workflowId = config.workflowId;
+ const computingUnitId = config.computingUnitId ?? 0;
+
+ // In k8s each computing unit is a separate pod, so the endpoint varies per
cuid.
+ const executionEndpoint = env.EXECUTION_ENDPOINT_TEMPLATE
+ ? env.EXECUTION_ENDPOINT_TEMPLATE.replace("{cuid}",
String(computingUnitId))
+ : backendConfig.executionEndpoint;
+
+ const url =
`${executionEndpoint}/api/execution/${workflowId}/${computingUnitId}/run`;
+
+ const timeoutSeconds = config.executionTimeoutMs
+ ? Math.ceil(config.executionTimeoutMs / 1000)
+ : Math.ceil(DEFAULT_AGENT_SETTINGS.executionTimeoutMs / 1000);
+
+ const request = {
+ executionName: "agent-execution",
+ logicalPlan: {
+ operators: logicalPlan.operators,
+ links: logicalPlan.links,
+ opsToViewResult: logicalPlan.opsToViewResult || [],
+ opsToReuseResult: [],
+ },
+ targetOperatorIds: logicalPlan.opsToViewResult || [],
+ timeoutSeconds,
+ maxOperatorResultCharLimit: config.maxOperatorResultCharLimit ??
DEFAULT_AGENT_SETTINGS.maxOperatorResultCharLimit,
+ maxOperatorResultCellCharLimit:
+ config.maxOperatorResultCellCharLimit ??
DEFAULT_AGENT_SETTINGS.maxOperatorResultCellCharLimit,
+ };
+
+ log.debug(
+ {
+ url,
+ maxOperatorResultCharLimit: request.maxOperatorResultCharLimit,
+ maxOperatorResultCellCharLimit: request.maxOperatorResultCellCharLimit,
+ },
+ "executing workflow"
+ );
+
+ try {
+ const response = await fetch(url, {
+ method: "POST",
+ headers: {
+ "Content-Type": "application/json",
+ Authorization: `Bearer ${config.userToken}`,
+ },
+ body: JSON.stringify(request),
+ signal: options.abortSignal,
+ });
+
+ if (!response.ok) {
+ const errorText = await response.text();
+ throw new Error(`Execution request failed: ${response.status}
${response.statusText} - ${errorText}`);
+ }
+
+ return (await response.json()) as SyncExecutionResult;
+ } catch (error) {
+ if (error instanceof Error && error.name === "AbortError") {
+ throw error;
+ }
+ log.error({ err: error }, "execution failed");
+ return {
+ success: false,
+ state: "Error",
+ operators: {},
+ errors: [error instanceof Error ? error.message : "Unknown error"],
+ };
+ }
+}
+
+function formatInputOutput(
+ workflowState: WorkflowState,
+ operatorId: string,
+ opInfo: OperatorInfo,
+ outputColumns: number
+): string {
+ const outputRows = opInfo.totalRowCount ?? opInfo.outputTuples;
+ const outputLine = `Output table shape: (${outputRows}, ${outputColumns})`;
+
+ const inputShapes = opInfo.inputPortShapes;
+ if (!inputShapes || inputShapes.length === 0) {
+ return outputLine;
+ }
+
+ const inputLinks = workflowState.getAllLinks().filter(l =>
l.target.operatorID === operatorId);
+ const portIndexToUpstream = new Map<number, string>();
+ const op = workflowState.getOperator(operatorId);
+ for (const link of inputLinks) {
+ const portIdx = op?.inputPorts.findIndex(p => p.portID ===
link.target.portID) ?? -1;
+ if (portIdx >= 0) {
+ portIndexToUpstream.set(portIdx, link.source.operatorID);
+ }
+ }
+
+ const inputPart = inputShapes
+ .sort((a, b) => a.portIndex - b.portIndex)
+ .map(p => {
+ const name = portIndexToUpstream.get(p.portIndex) ??
`input${p.portIndex}`;
+ return `${name}(${p.rows}, ${p.columns})`;
+ })
+ .join(", ");
+
+ return `Input operator(table shape): ${inputPart}\n${outputLine}`;
+}
+
+function formatExecutionError(
+ compilationErrors?: Record<string, string>,
+ operatorErrors?: Array<{ operatorId: string; error: string }>,
+ generalErrors?: string[]
+): string {
+ const lines: string[] = ["Execution failed due to the following error:"];
+
+ if (compilationErrors && Object.keys(compilationErrors).length > 0) {
+ lines.push("Compilation error:");
+ for (const [key, value] of Object.entries(compilationErrors)) {
+ lines.push(` ${key}: ${value}`);
+ }
+ }
+
+ if (operatorErrors && operatorErrors.length > 0) {
+ lines.push("Execution error:");
+ for (const { operatorId, error } of operatorErrors) {
+ lines.push(` ${operatorId}: ${error}`);
+ }
+ }
+
+ if (generalErrors && generalErrors.length > 0) {
+ lines.push("Error:");
+ for (const error of generalErrors) {
+ lines.push(` ${error}`);
+ }
+ }
+
+ return lines.join("\n");
+}
+
+function jsonToTableFormat(jsonResult: Record<string, any>[]): string {
+ if (!jsonResult || jsonResult.length === 0) return "";
+
+ const hasRowIndex = jsonResult.length > 0 && "__row_index__" in
jsonResult[0];
+ const headers = Object.keys(jsonResult[0]).filter(h => h !==
"__row_index__");
+ // Leading tab aligns headers with the index column (pandas __repr__ style).
+ const headerLine = "\t" + headers.join("\t");
+
+ const formattedRows: string[] = [];
+ let prevIndex = -1;
+
+ for (let i = 0; i < jsonResult.length; i++) {
+ const row = jsonResult[i];
+ const rowIndex = hasRowIndex ? (row["__row_index__"] as number) : i;
+
+ if (prevIndex >= 0 && rowIndex > prevIndex + 1) {
+ const dots = headers.map(() => "...").join("\t");
+ formattedRows.push(`...\t${dots}`);
+ }
+ prevIndex = rowIndex;
+
+ const cells = headers.map(h => {
+ const val = row[h];
+ if (val === null) return "NaN";
+ if (val === undefined) return "";
+ if (typeof val === "number" || typeof val === "boolean") return
String(val);
+ if (typeof val === "string") {
+ if (val === "NULL") return "NaN";
+ return val.replace(/\t/g, "\\t").replace(/\n/g, "\\n");
+ }
+ return JSON.stringify(val);
+ });
+ formattedRows.push(`${rowIndex}\t${cells.join("\t")}`);
+ }
+
+ return [headerLine, ...formattedRows].join("\n");
+}
+
+export async function executeOperatorAndFormat(
+ workflowState: WorkflowState,
+ config: ExecutionConfig,
+ operatorId: string,
+ options: {
+ abortSignal?: AbortSignal;
+ onResult?: (operatorId: string, operatorInfo: OperatorInfo) => void;
+ onResultLegacy?: (operatorId: string, backendStats?: Record<string,
string>) => void;
+ } = {}
+): Promise<string> {
+ // Serialize executions per workflow to avoid
ConcurrentModificationException on the backend.
+ const release = await getWorkflowMutex(config.workflowId).acquire();
+
+ try {
+ const logicalPlan = buildLogicalPlan(workflowState, [operatorId]);
+
+ if (logicalPlan.operators.length === 0) {
+ return createErrorResult("Cannot execute: workflow has no operators.");
+ }
+
+ // Only block on the target operator's validation errors; upstream issues
will
+ // surface as runtime errors that correctly identify the failing operator.
+ const validationResult = validateWorkflow(workflowState);
+ if (!validationResult.isValid) {
+ const targetErrors = validationResult.errors[operatorId];
+ if (targetErrors) {
+ const lines = [`Operator ${operatorId}:`];
+ for (const [field, message] of Object.entries(targetErrors)) {
+ lines.push(` - ${field}: ${message}`);
+ }
+ return createErrorResult(lines.join("\n"));
+ }
+ }
+
+ const result: SyncExecutionResult = await executeWorkflowHttp(config,
logicalPlan, {
+ abortSignal: options.abortSignal,
+ });
+
+ if (!result.success) {
+ const compilationErrors =
+ result.state === "CompilationFailed" || result.state ===
"ValidationFailed"
+ ? result.compilationErrors
+ : undefined;
+
+ const operatorErrors =
+ result.state === "Failed"
+ ? Object.entries(result.operators)
+ .filter(([_, op]) => op.error)
+ .map(([opId, op]) => ({ operatorId: opId, error: op.error! }))
+ : undefined;
+
+ const generalErrors = result.state === "Killed" ? ["Workflow execution
was killed (timeout)."] : result.errors;
+
+ const errorText = formatExecutionError(compilationErrors,
operatorErrors, generalErrors);
+
+ if (options.onResult) {
+ const errorInfo: OperatorInfo = {
+ state: result.state,
+ inputTuples: 0,
+ outputTuples: 0,
+ resultMode: "table",
+ error: errorText,
+ };
+ options.onResult(operatorId, errorInfo);
+ }
+
+ return createErrorResult(errorText);
+ }
+
+ const opInfo = result.operators[operatorId];
+ if (!opInfo) {
+ return createErrorResult(
+ formatExecutionError(undefined, undefined, [`No result found for
operator: ${operatorId}`])
+ );
+ }
+
+ if (opInfo.error) {
+ if (options.onResult) {
+ options.onResult(operatorId, opInfo);
+ }
+ return createErrorResult(formatExecutionError(undefined, [{ operatorId,
error: opInfo.error }]));
+ }
+
+ if (!opInfo.result || !Array.isArray(opInfo.result)) {
+ return "(no result data)";
+ }
+
+ const jsonArray = opInfo.result as Record<string, any>[];
+ const headers = jsonArray.length > 0 ? Object.keys(jsonArray[0]).filter(k
=> k !== "__row_index__") : [];
+ const columns = headers.length;
+
+ // Notify for every operator in the execution so upstream stats are also
stored.
+ if (options.onResult) {
+ for (const [opId, info] of Object.entries(result.operators)) {
+ if (info && !info.error) {
+ options.onResult(opId, info);
+ }
+ }
+ }
+
+ let dataString = jsonToTableFormat(jsonArray);
+
+ // Safety-net: TSV serialization may add padding beyond backend's
raw-record budget.
+ const charLimit = config.maxOperatorResultCharLimit ??
DEFAULT_AGENT_SETTINGS.maxOperatorResultCharLimit;
+
+ if (dataString.length > charLimit) {
+ const allLines = dataString.split("\n");
+ const headerLine = allLines[0];
+ const dataRows = allLines.slice(1);
+
+ const reservedSize = headerLine.length + 1;
+
+ const halfLimit = Math.floor((charLimit - reservedSize) / 2);
+
+ let frontSize = 0;
+ const frontRows: string[] = [];
+ for (const row of dataRows) {
+ const rowLen = row.length + 1;
+ if (frontSize + rowLen > halfLimit && frontRows.length > 0) break;
+ frontRows.push(row);
+ frontSize += rowLen;
+ }
+
+ let backSize = 0;
+ const backRows: string[] = [];
+ for (let i = dataRows.length - 1; i >= frontRows.length; i--) {
+ const rowLen = dataRows[i].length + 1;
+ if (backSize + rowLen > halfLimit && backRows.length > 0) break;
+ backRows.unshift(dataRows[i]);
+ backSize += rowLen;
+ }
+
+ const keptRows = [...frontRows, ...backRows];
+ dataString = [headerLine, ...keptRows].join("\n");
+ }
+
+ const shapeLine = formatInputOutput(workflowState, operatorId, opInfo,
columns);
+
+ const warningLines = opInfo.warnings?.map(w => w) ?? [];
+
+ const metadataLines = [shapeLine, ...warningLines].filter(Boolean);
+
+ const briefSummary = formatExecuteOperatorResult(operatorId);
+ return [briefSummary, ...metadataLines,
dataString].filter(Boolean).join("\n");
+ } catch (error: any) {
+ if (error.name === "AbortError") {
+ throw error;
+ }
+ return createErrorResult(`Execution failed: ${error.message ||
String(error)}`);
+ } finally {
+ release();
+ }
+}
+
+export function createExecuteOperatorTool(
+ workflowState: WorkflowState,
+ getConfig: () => ExecutionConfig,
+ onResult?: (operatorId: string, operatorInfo: OperatorInfo) => void
+) {
+ return tool({
+ description:
+ "Execute the workflow and get the specified operator's result. The
execution result(if succeeded) includes the shape of the input tables(if any)
and output table, and the records in the output table",
+ inputSchema: z.object({
+ operatorId: z.string().describe("The operator ID to view result for."),
+ }),
+ execute: async (args: { operatorId: string }, options: { abortSignal?:
AbortSignal }) => {
+ const config = getConfig();
+ return await executeOperatorAndFormat(workflowState, config,
args.operatorId, { ...options, onResult });
+ },
+ });
+}
diff --git a/agent-service/src/agent/util/auto-layout.ts
b/agent-service/src/agent/util/auto-layout.ts
new file mode 100644
index 0000000000..b17fed4e08
--- /dev/null
+++ b/agent-service/src/agent/util/auto-layout.ts
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import dagre from "dagre";
+import type { WorkflowState } from "../workflow-state";
+
+// Values mirror frontend joint-graph-wrapper.ts so agent-generated and
+// user-generated layouts visually match.
+const LAYOUT_CONFIG: dagre.GraphLabel = {
+ nodesep: 100,
+ edgesep: 150,
+ ranksep: 100,
+ ranker: "tight-tree",
+ rankdir: "LR",
+};
+
+const NODE_WIDTH = 200;
+const NODE_HEIGHT = 80;
+
+export function autoLayoutWorkflow(workflowState: WorkflowState): void {
+ const operators = workflowState.getAllOperators();
+ const links = workflowState.getAllLinks();
+
+ if (operators.length === 0) {
+ return;
+ }
+
+ const graph = new dagre.graphlib.Graph();
+ graph.setGraph(LAYOUT_CONFIG);
+ graph.setDefaultEdgeLabel(() => ({}));
+
+ for (const operator of operators) {
+ graph.setNode(operator.operatorID, {
+ width: NODE_WIDTH,
+ height: NODE_HEIGHT,
+ });
+ }
+
+ for (const link of links) {
+ graph.setEdge(link.source.operatorID, link.target.operatorID);
+ }
+
+ dagre.layout(graph);
+
+ for (const operator of operators) {
+ const node = graph.node(operator.operatorID);
+ if (node) {
+ workflowState.updateOperatorPosition(operator.operatorID, {
+ x: node.x,
+ y: node.y,
+ });
+ }
+ }
+}
diff --git a/agent-service/src/agent/workflow-result-state.test.ts
b/agent-service/src/agent/workflow-result-state.test.ts
new file mode 100644
index 0000000000..b2e46fd0d9
--- /dev/null
+++ b/agent-service/src/agent/workflow-result-state.test.ts
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { describe, expect, test } from "bun:test";
+import { WorkflowResultState } from "./workflow-result-state";
+import type { OperatorInfo } from "../types/execution";
+
+function makeInfo(outputTuples: number): OperatorInfo {
+ return {
+ state: "Completed",
+ inputTuples: 0,
+ outputTuples,
+ resultMode: "table",
+ };
+}
+
+describe("WorkflowResultState - ancestor walk", () => {
+ test("returns the most recent ancestor entry", () => {
+ let path: string[] = [];
+ const state = new WorkflowResultState(() => path);
+
+ state.set("op1", "step-A", makeInfo(1));
+ state.set("op1", "step-B", makeInfo(2));
+ state.set("op1", "step-C", makeInfo(3));
+
+ path = ["step-A", "step-B", "step-C"];
+ expect(state.get("op1")?.operatorInfo.outputTuples).toBe(3);
+
+ // Rewind to step-B; step-C is no longer an ancestor.
+ path = ["step-A", "step-B"];
+ expect(state.get("op1")?.operatorInfo.outputTuples).toBe(2);
+
+ // Rewind further.
+ path = ["step-A"];
+ expect(state.get("op1")?.operatorInfo.outputTuples).toBe(1);
+ });
+
+ test("returns undefined when no ancestor has a result", () => {
+ const state = new WorkflowResultState(() => ["step-X"]);
+ state.set("op1", "step-A", makeInfo(1));
+ expect(state.get("op1")).toBeUndefined();
+ });
+
+ test("returns undefined for unknown operator", () => {
+ const state = new WorkflowResultState(() => ["step-A"]);
+ expect(state.get("missing")).toBeUndefined();
+ });
+
+ test("getAllVisible returns one entry per operator on the current branch",
() => {
+ let path: string[] = [];
+ const state = new WorkflowResultState(() => path);
+
+ // op1 has results on step-A and step-C; the branch only goes through A
and B.
+ state.set("op1", "step-A", makeInfo(1));
+ state.set("op1", "step-C", makeInfo(99));
+ state.set("op2", "step-B", makeInfo(7));
+
+ path = ["step-A", "step-B"];
+ const visible = state.getAllVisible();
+ expect(visible.size).toBe(2);
+ expect(visible.get("op1")?.operatorInfo.outputTuples).toBe(1);
+ expect(visible.get("op2")?.operatorInfo.outputTuples).toBe(7);
+ });
+
+ test("clear drops all stored results", () => {
+ const state = new WorkflowResultState(() => ["step-A"]);
+ state.set("op1", "step-A", makeInfo(1));
+ state.clear();
+ expect(state.get("op1")).toBeUndefined();
+ expect(state.getAllVisible().size).toBe(0);
+ });
+
+ test("set on the same step overwrites", () => {
+ const state = new WorkflowResultState(() => ["step-A"]);
+ state.set("op1", "step-A", makeInfo(1));
+ state.set("op1", "step-A", makeInfo(42));
+ expect(state.get("op1")?.operatorInfo.outputTuples).toBe(42);
+ });
+});
diff --git a/agent-service/src/agent/workflow-result-state.ts
b/agent-service/src/agent/workflow-result-state.ts
new file mode 100644
index 0000000000..e6f13c2301
--- /dev/null
+++ b/agent-service/src/agent/workflow-result-state.ts
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import type { OperatorInfo } from "../types/execution";
+
+interface ResultEntry {
+ operatorInfo: OperatorInfo;
+ stepId: string;
+}
+
+/**
+ * Versioned per-operator execution results keyed by step id.
+ *
+ * Each operator can have multiple result snapshots (one per step that
+ * executed it). Lookups walk the current ancestor path from HEAD and return
+ * the most recent result visible on that branch, so checking out an earlier
+ * step exposes the results that were live at that point.
+ */
+export class WorkflowResultState {
+ private results = new Map<string, Map<string, ResultEntry>>();
+
+ constructor(private getAncestorPath: () => string[]) {}
+
+ set(operatorId: string, stepId: string, operatorInfo: OperatorInfo): void {
+ let versions = this.results.get(operatorId);
+ if (!versions) {
+ versions = new Map();
+ this.results.set(operatorId, versions);
+ }
+ versions.set(stepId, { operatorInfo, stepId });
+ }
+
+ get(operatorId: string): ResultEntry | undefined {
+ const versions = this.results.get(operatorId);
+ if (!versions) return undefined;
+
+ const path = this.getAncestorPath();
+ for (let i = path.length - 1; i >= 0; i--) {
+ const entry = versions.get(path[i]);
+ if (entry) return entry;
+ }
+ return undefined;
+ }
+
+ getOperatorInfo(operatorId: string): OperatorInfo | undefined {
+ return this.get(operatorId)?.operatorInfo;
+ }
+
+ getAllVisible(): Map<string, ResultEntry> {
+ const result = new Map<string, ResultEntry>();
+ const path = this.getAncestorPath();
+
+ for (const [operatorId, versions] of this.results) {
+ for (let i = path.length - 1; i >= 0; i--) {
+ if (versions.has(path[i])) {
+ result.set(operatorId, versions.get(path[i])!);
+ break;
+ }
+ }
+ }
+ return result;
+ }
+
+ clear(): void {
+ this.results.clear();
+ }
+}
diff --git a/agent-service/src/types/index.ts
b/agent-service/src/api/execution-api.ts
similarity index 66%
copy from agent-service/src/types/index.ts
copy to agent-service/src/api/execution-api.ts
index 1d57f2f328..4692a61d75 100644
--- a/agent-service/src/types/index.ts
+++ b/agent-service/src/api/execution-api.ts
@@ -17,5 +17,22 @@
* under the License.
*/
-export * from "./workflow";
-export * from "./agent";
+export interface LogicalLink {
+ fromOpId: string;
+ fromPortId: { id: number; internal: boolean };
+ toOpId: string;
+ toPortId: { id: number; internal: boolean };
+}
+
+interface LogicalOperator {
+ operatorID: string;
+ operatorType: string;
+ [key: string]: any;
+}
+
+export interface LogicalPlan {
+ operators: LogicalOperator[];
+ links: LogicalLink[];
+ opsToViewResult?: string[];
+ opsToReuseResult?: string[];
+}
diff --git a/agent-service/src/api/index.ts b/agent-service/src/api/index.ts
index b4d3cff362..eca292d7ff 100644
--- a/agent-service/src/api/index.ts
+++ b/agent-service/src/api/index.ts
@@ -18,6 +18,7 @@
*/
export * from "./backend-api";
+export * from "./execution-api";
export * from "./workflow-api";
export * from "./auth-api";
export * from "./compile-api";
diff --git a/agent-service/src/index.ts b/agent-service/src/index.ts
index 7e60920b32..152a2d2170 100644
--- a/agent-service/src/index.ts
+++ b/agent-service/src/index.ts
@@ -19,6 +19,7 @@
export * from "./types";
export { WorkflowState } from "./agent/workflow-state";
+export { WorkflowResultState } from "./agent/workflow-result-state";
export { WorkflowSystemMetadata } from "./agent/util/workflow-system-metadata";
export * from "./agent/tools";
export { TexeraAgent, type TexeraAgentConfig, type AgentMessageResult } from
"./agent/texera-agent";
diff --git a/agent-service/src/server.ts b/agent-service/src/server.ts
index d24f0b1bbb..d515931cc1 100644
--- a/agent-service/src/server.ts
+++ b/agent-service/src/server.ts
@@ -80,6 +80,7 @@ async function createAgentInstance(
userInfo: delegateConfig.userInfo,
workflowId: delegateConfig.workflowId,
workflowName: delegateConfig.workflowName,
+ computingUnitId: delegateConfig.computingUnitId,
});
log.info({ agentId, workflowId: delegateConfig.workflowId }, "loaded
workflow for agent");
@@ -121,6 +122,7 @@ function getAgentInfo(agentId: string, agent: TexeraAgent):
AgentInfo {
userInfo: delegateConfig.userInfo,
workflowId: delegateConfig.workflowId,
workflowName: delegateConfig.workflowName,
+ computingUnitId: delegateConfig.computingUnitId,
}
: undefined,
settings: settingsApi,
@@ -165,7 +167,7 @@ const agentsRouter = new Elysia({ prefix: "/agents" })
.post(
"/",
async ({ body }) => {
- const { modelType, name, userToken, workflowId, settings } = body as
CreateAgentRequest;
+ const { modelType, name, userToken, workflowId, computingUnitId,
settings } = body as CreateAgentRequest;
if (!modelType) {
throw new Error("modelType is required");
@@ -182,6 +184,7 @@ const agentsRouter = new Elysia({ prefix: "/agents" })
userToken,
userInfo,
workflowId,
+ computingUnitId,
};
}
@@ -218,6 +221,7 @@ const agentsRouter = new Elysia({ prefix: "/agents" })
name: t.Optional(t.String()),
userToken: t.Optional(t.String()),
workflowId: t.Optional(t.Number()),
+ computingUnitId: t.Optional(t.Number()),
settings: t.Optional(
t.Object({
maxOperatorResultCharLimit: t.Optional(t.Number()),
@@ -429,11 +433,30 @@ interface WsOutgoingMessage {
workflowContent?: any;
}
-// Without execution tools, agents never produce operator results. The route
-// and the WS payload field stay so the frontend keeps working; they just
-// always carry an empty object on the framework-only build.
-function getOperatorResultSummaries(_agent: TexeraAgent): Record<string,
OperatorResultSummaryWs> {
- return {};
+function getOperatorResultSummaries(agent: TexeraAgent): Record<string,
OperatorResultSummaryWs> {
+ const resultState = agent.getWorkflowResultState();
+ const visible = resultState.getAllVisible();
+ const results: Record<string, OperatorResultSummaryWs> = {};
+ for (const [opId, entry] of visible) {
+ const info = entry.operatorInfo;
+ results[opId] = {
+ state: info.state,
+ inputTuples: info.inputTuples,
+ outputTuples: info.outputTuples,
+ inputPortShapes: info.inputPortShapes,
+ outputColumns:
+ info.result && info.result.length > 0
+ ? Object.keys(info.result[0]).filter(k => k !==
"__row_index__").length
+ : undefined,
+ error: info.error,
+ warnings: info.warnings,
+ consoleLogCount: info.consoleLogs?.length,
+ totalRowCount: info.totalRowCount,
+ sampleRecords: info.result,
+ resultStatistics: info.resultStatistics,
+ };
+ }
+ return results;
}
function broadcastToAgent(agentId: string, message: WsOutgoingMessage): void {
diff --git a/agent-service/src/types/agent.ts b/agent-service/src/types/agent.ts
index 7c865d932a..6e19cb7b14 100644
--- a/agent-service/src/types/agent.ts
+++ b/agent-service/src/types/agent.ts
@@ -111,6 +111,7 @@ export interface AgentDelegateConfig {
userInfo?: UserInfo;
workflowId?: number;
workflowName?: string;
+ computingUnitId?: number;
}
export interface AgentSettingsApi {
@@ -139,6 +140,7 @@ export interface CreateAgentRequest {
name?: string;
userToken?: string;
workflowId?: number;
+ computingUnitId?: number;
settings?: AgentSettingsApi;
}
diff --git a/agent-service/src/types/index.ts
b/agent-service/src/types/execution.ts
similarity index 52%
copy from agent-service/src/types/index.ts
copy to agent-service/src/types/execution.ts
index 1d57f2f328..f93be5c583 100644
--- a/agent-service/src/types/index.ts
+++ b/agent-service/src/types/execution.ts
@@ -17,5 +17,37 @@
* under the License.
*/
-export * from "./workflow";
-export * from "./agent";
+interface ConsoleMessage {
+ msgType: string;
+ message: string;
+}
+
+interface PortShape {
+ portIndex: number;
+ rows: number;
+ columns: number;
+}
+
+export interface OperatorInfo {
+ state: string;
+ inputTuples: number;
+ outputTuples: number;
+ inputPortShapes?: PortShape[];
+ resultMode: string;
+ result?: Record<string, any>[];
+ totalRowCount?: number;
+ displayedRows?: number;
+ truncated?: boolean;
+ consoleLogs?: ConsoleMessage[];
+ error?: string;
+ warnings?: string[];
+ resultStatistics?: Record<string, string>;
+}
+
+export interface SyncExecutionResult {
+ success: boolean;
+ state: string;
+ operators: Record<string, OperatorInfo>;
+ compilationErrors?: Record<string, string>;
+ errors?: string[];
+}
diff --git a/agent-service/src/types/index.ts b/agent-service/src/types/index.ts
index 1d57f2f328..c6d7291e51 100644
--- a/agent-service/src/types/index.ts
+++ b/agent-service/src/types/index.ts
@@ -18,4 +18,5 @@
*/
export * from "./workflow";
+export * from "./execution";
export * from "./agent";
diff --git
a/amber/src/main/scala/org/apache/texera/web/ComputingUnitMaster.scala
b/amber/src/main/scala/org/apache/texera/web/ComputingUnitMaster.scala
index 4b07113b1c..d0162b176d 100644
--- a/amber/src/main/scala/org/apache/texera/web/ComputingUnitMaster.scala
+++ b/amber/src/main/scala/org/apache/texera/web/ComputingUnitMaster.scala
@@ -47,7 +47,11 @@ import org.apache.texera.dao.SqlServer
import org.apache.texera.dao.jooq.generated.tables.pojos.WorkflowExecutions
import org.apache.texera.web.auth.JwtAuth.setupJwtAuth
import
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
-import org.apache.texera.web.resource.{WebsocketPayloadSizeTuner,
WorkflowWebsocketResource}
+import org.apache.texera.web.resource.{
+ SyncExecutionResource,
+ WebsocketPayloadSizeTuner,
+ WorkflowWebsocketResource
+}
import org.apache.texera.web.service.ExecutionsMetadataPersistService
import org.eclipse.jetty.server.session.SessionHandler
import org.eclipse.jetty.servlet.FilterHolder
@@ -189,6 +193,7 @@ class ComputingUnitMaster extends
io.dropwizard.Application[Configuration] with
}
environment.jersey.register(classOf[WorkflowExecutionsResource])
+ environment.jersey.register(classOf[SyncExecutionResource])
// Route request logs through SLF4J, controlled by
TEXERA_SERVICE_LOG_LEVEL.
// TODO: replace with RequestLoggingFilter.register() from common/auth
once Dropwizard is upgraded to 4.x
diff --git
a/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala
b/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala
new file mode 100644
index 0000000000..489c9016d5
--- /dev/null
+++
b/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala
@@ -0,0 +1,900 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.resource
+
+import com.fasterxml.jackson.databind.node.ObjectNode
+import com.typesafe.scalalogging.LazyLogging
+import io.dropwizard.auth.Auth
+import org.apache.texera.amber.config.ApplicationConfig
+import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.core.storage.model.VirtualDocument
+import org.apache.texera.amber.core.tuple.Tuple
+import org.apache.texera.amber.core.virtualidentity.{
+ ExecutionIdentity,
+ OperatorIdentity,
+ WorkflowIdentity
+}
+import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext,
WorkflowSettings}
+import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
+ ConsoleMessage,
+ ConsoleMessageType
+}
+import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState
+import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState._
+import org.apache.texera.amber.engine.common.executionruntimestate.{
+ ExecutionConsoleStore,
+ ExecutionMetadataStore,
+ ExecutionStatsStore
+}
+import io.reactivex.rxjava3.core.Observable
+import org.apache.texera.auth.SessionUser
+import org.apache.texera.dao.SqlServer
+import org.apache.texera.dao.jooq.generated.Tables.OPERATOR_EXECUTIONS
+import org.apache.texera.web.model.websocket.request.{LogicalPlanPojo,
WorkflowExecuteRequest}
+import org.apache.texera.workflow.{LogicalLink, WorkflowCompiler}
+import
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
+import org.apache.texera.web.service.{ExecutionResultService, WorkflowService}
+import org.apache.texera.web.storage.ExecutionStateStore.updateWorkflowState
+
+import java.net.URI
+import java.util.concurrent.TimeUnit
+import javax.annotation.security.RolesAllowed
+import javax.ws.rs._
+import javax.ws.rs.core.MediaType
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+import com.fasterxml.jackson.databind.ObjectMapper
+
+case class SyncExecutionRequest(
+ executionName: String,
+ logicalPlan: LogicalPlanPojo,
+ workflowSettings: Option[WorkflowSettings],
+ targetOperatorIds: List[String],
+ timeoutSeconds: Int,
+ maxOperatorResultCharLimit: Int,
+ maxOperatorResultCellCharLimit: Int
+)
+
+case class ConsoleMessageInfo(
+ msgType: String,
+ title: String,
+ message: String
+)
+
+case class PortShape(
+ portIndex: Int,
+ rows: Long
+)
+
+case class OperatorInfo(
+ state: String,
+ inputTuples: Long,
+ outputTuples: Long,
+ inputPortShapes: Option[List[PortShape]],
+ resultMode: String, // "table" or "visualization"
+ result: Option[Any], // JSON array (List[ObjectNode])
+ totalRowCount: Option[Int],
+ displayedRows: Option[Int],
+ truncated: Option[Boolean],
+ consoleLogs: Option[List[ConsoleMessageInfo]],
+ error: Option[String],
+ warnings: Option[List[String]]
+)
+
+case class SyncExecutionResult(
+ success: Boolean,
+ state: String,
+ operators: Map[String, OperatorInfo],
+ compilationErrors: Option[Map[String, String]],
+ errors: Option[List[String]]
+)
+
+sealed trait TerminationReason
+case class TerminalStateReached(state: ExecutionMetadataStore) extends
TerminationReason
+case class ConsoleErrorDetected(consoleState: ExecutionConsoleStore) extends
TerminationReason
+case class TargetResultsReady(statsState: ExecutionStatsStore) extends
TerminationReason
+
+@Path("/execution")
+@Consumes(Array(MediaType.APPLICATION_JSON))
+@Produces(Array(MediaType.APPLICATION_JSON))
+class SyncExecutionResource extends LazyLogging {
+
+ // Hard caps applied regardless of request — guard against runaway payloads.
+ private val MAX_OPERATOR_RESULT_CHARS = 100000
+ private val MAX_OPERATOR_RESULT_CELL_CHARS = 20000
+
+ @POST
+ @Path("/{wid}/{cuid}/run")
+ @RolesAllowed(Array("REGULAR", "ADMIN"))
+ def executeWorkflowSync(
+ @PathParam("wid") workflowId: Long,
+ @PathParam("cuid") computingUnitId: Int,
+ request: SyncExecutionRequest,
+ @Auth user: SessionUser
+ ): SyncExecutionResult = {
+ val timeoutSeconds = request.timeoutSeconds
+
+ val maxOperatorResultCharLimit =
+ Math.min(request.maxOperatorResultCharLimit, MAX_OPERATOR_RESULT_CHARS)
+ val maxOperatorResultCellCharLimit =
+ Math.min(request.maxOperatorResultCellCharLimit,
MAX_OPERATOR_RESULT_CELL_CHARS)
+
+ logger.info(
+ s"Starting sync execution for workflow $workflowId with limits: " +
+ s"maxOperatorResultCharLimit=${request.maxOperatorResultCharLimit}
(capped to $maxOperatorResultCharLimit), " +
+
s"maxOperatorResultCellCharLimit=${request.maxOperatorResultCellCharLimit}
(capped to $maxOperatorResultCellCharLimit)"
+ )
+
+ try {
+ val workflowService = WorkflowService.getOrCreate(
+ WorkflowIdentity(workflowId),
+ computingUnitId
+ )
+
+ shutdownPreviousExecution(workflowService)
+
+ // "Execute To" semantics: when a single target is given, run only its
upstream sub-DAG.
+ val effectiveLogicalPlan =
+ computeSubDAGIfNeeded(request.logicalPlan, request.targetOperatorIds)
+
+ val executeRequest = WorkflowExecuteRequest(
+ executionName = request.executionName,
+ engineVersion = "1.0",
+ logicalPlan = effectiveLogicalPlan,
+ replayFromExecution = None,
+ workflowSettings = request.workflowSettings
+ .getOrElse(
+ WorkflowSettings(dataTransferBatchSize =
ApplicationConfig.defaultDataTransferBatchSize)
+ ),
+ emailNotificationEnabled = false,
+ computingUnitId = computingUnitId
+ )
+
+ workflowService.initExecutionService(
+ executeRequest,
+ Some(user.getUser),
+ new URI(s"sync-execution://$workflowId")
+ )
+
+ val executionService = workflowService.executionService.getValue
+ if (executionService == null) {
+ return SyncExecutionResult(
+ success = false,
+ state = "Error",
+ operators = Map.empty,
+ compilationErrors = None,
+ errors = Some(List("Failed to initialize execution service"))
+ )
+ }
+
+ // Snapshot before subscribing — handles the race where a fast execution
finishes
+ // before the Observable below sees any state change.
+ val currentState =
executionService.executionStateStore.metadataStore.getState
+ val currentConsoleState =
executionService.executionStateStore.consoleStore.getState
+ val currentStatsState =
executionService.executionStateStore.statsStore.getState
+
+ // Require COMPLETED, not just "has output", so upstream operators
finish flushing
+ // their data downstream before we tear the execution down.
+ def allTargetsCompleted(stats: ExecutionStatsStore): Boolean = {
+ request.targetOperatorIds.nonEmpty && request.targetOperatorIds.forall
{ opId =>
+ stats.operatorInfo.get(opId).exists { metrics =>
+ metrics.operatorState == COMPLETED
+ }
+ }
+ }
+
+ val terminationReason: TerminationReason =
+ if (isTerminalState(currentState.state)) {
+ TerminalStateReached(currentState)
+ } else if (hasConsoleError(currentConsoleState)) {
+ ConsoleErrorDetected(currentConsoleState)
+ } else if (allTargetsCompleted(currentStatsState)) {
+ TargetResultsReady(currentStatsState)
+ } else {
+ val terminalStateObservable: Observable[TerminationReason] =
+
executionService.executionStateStore.metadataStore.getStateObservable
+ .filter((state: ExecutionMetadataStore) =>
isTerminalState(state.state))
+ .map[TerminationReason](state => TerminalStateReached(state))
+
+ val consoleErrorObservable: Observable[TerminationReason] =
+
executionService.executionStateStore.consoleStore.getStateObservable
+ .filter((consoleState: ExecutionConsoleStore) =>
hasConsoleError(consoleState))
+ .map[TerminationReason](consoleState =>
ConsoleErrorDetected(consoleState))
+
+ val targetResultsObservable: Observable[TerminationReason] =
+ executionService.executionStateStore.statsStore.getStateObservable
+ .filter((stats: ExecutionStatsStore) =>
allTargetsCompleted(stats))
+ .map[TerminationReason](stats => TargetResultsReady(stats))
+
+ try {
+ Observable
+ .amb(
+ java.util.Arrays.asList(
+ terminalStateObservable,
+ consoleErrorObservable,
+ targetResultsObservable
+ )
+ )
+ .firstOrError()
+ .timeout(timeoutSeconds.toLong, TimeUnit.SECONDS)
+ .blockingGet()
+ } catch {
+ case _: java.util.concurrent.TimeoutException =>
+ killExecution(executionService)
+ return SyncExecutionResult(
+ success = false,
+ state = "Killed",
+ operators = Map.empty,
+ compilationErrors = None,
+ errors = Some(List(s"Timeout after $timeoutSeconds seconds"))
+ )
+ case e: Exception =>
+ logger.error(s"Error waiting for execution: ${e.getMessage}", e)
+ return SyncExecutionResult(
+ success = false,
+ state = "Error",
+ operators = Map.empty,
+ compilationErrors = None,
+ errors = Some(List(e.getMessage))
+ )
+ }
+ }
+
+ val (finalState, terminatedByConsoleError, terminatedByTargetResults) =
+ terminationReason match {
+ case TerminalStateReached(state) =>
+ (state, false, false)
+ case ConsoleErrorDetected(_) =>
+ killExecution(executionService)
+ (executionService.executionStateStore.metadataStore.getState,
true, false)
+ case TargetResultsReady(_) =>
+ // RegionExecutionCoordinator caches upstream results
asynchronously after operators
+ // complete; sleep gives that caching a chance to finish before we
shut down the client.
+ // TODO: replace with a synchronous signal from the engine.
+ Thread.sleep(500)
+ killExecution(executionService)
+ // Override to COMPLETED — we have everything we asked for, even
though the engine
+ // sees this as a kill.
+
executionService.executionStateStore.metadataStore.updateState(metadataStore =>
+ updateWorkflowState(COMPLETED, metadataStore)
+ )
+ (executionService.executionStateStore.metadataStore.getState,
false, true)
+ }
+
+ // Let the result writer flush before we read storage.
+ Thread.sleep(500)
+
+ // Console DB writes lag the in-memory store; pass the latter so error
extraction
+ // can fall back when the row hasn't landed yet.
+ val inMemoryConsoleState = terminationReason match {
+ case ConsoleErrorDetected(consoleState) => Some(consoleState)
+ case _ => None
+ }
+
+ val executionId = executionService.workflowContext.executionId
+ val operatorInfos = collectOperatorInfos(
+ executionId,
+ executionService,
+ request.targetOperatorIds,
+ maxOperatorResultCharLimit,
+ maxOperatorResultCellCharLimit,
+ inMemoryConsoleState
+ )
+
+ val fatalErrors = finalState.fatalErrors
+ .map(err => s"${err.`type`}: ${err.message}")
+ .toList
+
+ val hasOperatorConsoleError =
operatorInfos.values.exists(_.error.isDefined)
+
+ val stateString =
+ if (terminatedByConsoleError) "Failed"
+ else if (terminatedByTargetResults) "Completed"
+ else stateToString(finalState.state)
+
+ val isSuccess = (finalState.state == COMPLETED ||
terminatedByTargetResults) &&
+ !hasOperatorConsoleError && !terminatedByConsoleError
+
+ SyncExecutionResult(
+ success = isSuccess,
+ state = stateString,
+ operators = operatorInfos,
+ compilationErrors = None,
+ errors = if (fatalErrors.nonEmpty) Some(fatalErrors) else None
+ )
+
+ } catch {
+ case e: Exception =>
+ logger.error(s"Sync execution error: ${e.getMessage}", e)
+ handleExecutionError(e)
+ }
+ }
+
+ private def shutdownPreviousExecution(workflowService: WorkflowService):
Unit = {
+ try {
+ val previousEs = workflowService.executionService.getValue
+ if (previousEs != null && previousEs.client != null) {
+ logger.info(s"Shutting down previous execution client")
+ previousEs.client.shutdown()
+ }
+ } catch {
+ case e: Exception =>
+ logger.warn(s"Error shutting down previous execution client:
${e.getMessage}")
+ }
+ }
+
+ private def killExecution(
+ executionService: org.apache.texera.web.service.WorkflowExecutionService
+ ): Unit = {
+ try {
+ if (executionService.client != null) {
+ executionService.client.shutdown()
+ }
+ executionService.executionStateStore.statsStore.updateState(stats =>
+ stats.withEndTimeStamp(System.currentTimeMillis())
+ )
+
executionService.executionStateStore.metadataStore.updateState(metadataStore =>
+ updateWorkflowState(KILLED, metadataStore)
+ )
+ } catch {
+ case e: Exception =>
+ logger.warn(s"Error killing execution: ${e.getMessage}")
+ }
+ }
+
+ private def collectOperatorInfos(
+ executionId: ExecutionIdentity,
+ executionService: org.apache.texera.web.service.WorkflowExecutionService,
+ targetOperatorIds: List[String],
+ maxOperatorResultCharLimit: Int,
+ maxOperatorResultCellCharLimit: Int,
+ inMemoryConsoleState: Option[ExecutionConsoleStore] = None
+ ): Map[String, OperatorInfo] = {
+ val operatorInfos = mutable.Map[String, OperatorInfo]()
+
+ val statsState = executionService.executionStateStore.statsStore.getState
+ val operatorStats = statsState.operatorInfo
+
+ val baseTargetOps = if (targetOperatorIds.nonEmpty) {
+ targetOperatorIds
+ } else {
+ operatorStats.keys.toList
+ }
+
+ // Pull in any operator that logged a console error even if it isn't a
target —
+ // otherwise the caller can't see why an upstream op failed.
+ val consoleErrorOps = inMemoryConsoleState
+ .map { consoleState =>
+ consoleState.operatorConsole.keys.toList
+ }
+ .getOrElse(List.empty)
+
+ val targetOps = (baseTargetOps ++ consoleErrorOps).distinct
+
+ for (opId <- targetOps) {
+ val stats = operatorStats.get(opId)
+ val (state, inputTuples, outputTuples): (String, Long, Long) = stats
match {
+ case Some(s) =>
+ val inputCount =
s.operatorStatistics.inputMetrics.map(_.tupleMetrics.count).sum
+ val outputCount =
s.operatorStatistics.outputMetrics.map(_.tupleMetrics.count).sum
+ (stateToString(s.operatorState), inputCount, outputCount)
+ case None => ("Unknown", 0L, 0L)
+ }
+
+ val inputPortShapes: Option[List[PortShape]] = stats
+ .map { s =>
+ s.operatorStatistics.inputMetrics.map { pm =>
+ PortShape(pm.portId.id, pm.tupleMetrics.count)
+ }.toList
+ }
+ .filter(_.nonEmpty)
+
+ val (resultMode, result, totalRowCount, displayedRows, truncated) =
+ collectOperatorResult(
+ executionId,
+ opId,
+ maxOperatorResultCharLimit,
+ maxOperatorResultCellCharLimit
+ )
+
+ // DB is authoritative once written; fall back to in-memory state for
in-flight runs
+ // where the console row hasn't been persisted yet.
+ val dbConsoleLogs = collectConsoleLogs(executionId, opId)
+ val consoleLogs = dbConsoleLogs.orElse {
+ inMemoryConsoleState.flatMap { consoleState =>
+ consoleState.operatorConsole
+ .get(opId)
+ .map { opConsole =>
+ opConsole.consoleMessages.map { msg =>
+ ConsoleMessageInfo(
+ msgType = msg.msgType.name,
+ title = msg.title,
+ message = msg.message
+ )
+ }.toList
+ }
+ .filter(_.nonEmpty)
+ }
+ }
+
+ // Python writes the full error text to `message`; Scala writes it to
`title`
+ // (with a stack trace in `message`). Pick whichever is longer to avoid
losing detail.
+ val errorMsg = consoleLogs.flatMap(
+ _.find(_.msgType == "ERROR").map { e =>
+ if (e.message.nonEmpty && e.message.length > e.title.length)
e.message
+ else e.title
+ }
+ )
+
+ // Convention: PRINT messages prefixed with "WARNING: " surface as
warnings.
+ val warningMsgs = consoleLogs
+ .map(_.filter(_.title.startsWith("WARNING: ")).map(_.title))
+ .filter(_.nonEmpty)
+
+ operatorInfos(opId) = OperatorInfo(
+ state = state,
+ inputTuples = inputTuples,
+ outputTuples = outputTuples,
+ inputPortShapes = inputPortShapes,
+ resultMode = resultMode,
+ result = result,
+ totalRowCount = totalRowCount,
+ displayedRows = displayedRows,
+ truncated = truncated,
+ consoleLogs = consoleLogs,
+ error = errorMsg,
+ warnings = warningMsgs
+ )
+ }
+
+ operatorInfos.toMap
+ }
+
+ private def handleExecutionError(e: Exception): SyncExecutionResult = {
+ val errorMsg = e.getMessage
+ val isCompilationError = errorMsg != null && (
+ errorMsg.contains("compilation") ||
+ errorMsg.contains("Compilation") ||
+ errorMsg.contains("operator") ||
+ errorMsg.contains("schema")
+ )
+
+ if (isCompilationError) {
+ SyncExecutionResult(
+ success = false,
+ state = "CompilationFailed",
+ operators = Map.empty,
+ compilationErrors = Some(Map("error" -> errorMsg)),
+ errors = Some(List(errorMsg))
+ )
+ } else {
+ SyncExecutionResult(
+ success = false,
+ state = "Error",
+ operators = Map.empty,
+ compilationErrors = None,
+ errors = Some(List(Option(e.getMessage).getOrElse("Unknown error")))
+ )
+ }
+ }
+
+ /**
+ * Symmetric truncation: fill half the char budget from the front of the
result, keep a
+ * sliding-window of the most recent tuples for the back half. Returns a
JSON array;
+ * serialization to table/toon format happens in agent-service.
+ */
+ private def collectOperatorResult(
+ executionId: ExecutionIdentity,
+ opId: String,
+ maxOperatorResultCharLimit: Int,
+ maxOperatorResultCellCharLimit: Int
+ ): (String, Option[Any], Option[Int], Option[Int], Option[Boolean]) = {
+ import com.fasterxml.jackson.databind.node.ObjectNode
+
+ try {
+ val storageUriOption =
WorkflowExecutionsResource.getResultUriByLogicalPortId(
+ executionId,
+ OperatorIdentity(opId),
+ PortIdentity()
+ )
+
+ storageUriOption match {
+ case Some(storageUri) =>
+ val document = DocumentFactory
+ .openDocument(storageUri)
+ ._1
+ .asInstanceOf[VirtualDocument[Tuple]]
+
+ val totalCount = document.getCount.toInt
+ val mapper = new ObjectMapper()
+ val tupleIterator = document.get()
+
+ if (totalCount == 0 || !tupleIterator.hasNext) {
+ return (
+ "table",
+ Some(List.empty[ObjectNode].asJava),
+ Some(0),
+ Some(0),
+ Some(false)
+ )
+ }
+
+ // A single tuple with html-content / json-content is a
visualization payload —
+ // the frontend renders it as an iframe rather than a table.
+ val firstTuple = tupleIterator.next()
+ if (totalCount == 1 && isVisualizationTuple(firstTuple)) {
+ val jsonResults =
+ ExecutionResultService.convertTuplesToJson(List(firstTuple),
isVisualization = true)
+ jsonResults.foreach(
+ _.asInstanceOf[ObjectNode].put("__is_visualization__", true)
+ )
+ return (
+ "visualization",
+ Some(jsonResults),
+ Some(totalCount),
+ Some(1),
+ Some(false)
+ )
+ }
+
+ // __row_index__ preserves the original position so the frontend can
show
+ // "row N" correctly after symmetric truncation drops the middle.
+ var rowIndex = 0
+ val firstJson =
ExecutionResultService.convertTuplesToJson(List(firstTuple)).head
+ val truncatedFirst = truncateSingleTuple(firstJson,
maxOperatorResultCellCharLimit)
+ truncatedFirst.put("__row_index__", rowIndex)
+ val firstSize = estimateTupleSize(truncatedFirst, mapper)
+
+ if (firstSize >= maxOperatorResultCharLimit) {
+ return (
+ "table",
+ Some(List(truncatedFirst).asJava),
+ Some(totalCount),
+ Some(1),
+ Some(true)
+ )
+ }
+
+ val halfLimit = maxOperatorResultCharLimit / 2
+ val truncationNoticeSize = 50 // reserved for the "...skipped..."
marker
+
+ val frontTuples = mutable.ListBuffer[ObjectNode](truncatedFirst)
+ var frontSize = firstSize
+ var processedCount = 1
+
+ while (tupleIterator.hasNext && frontSize < halfLimit) {
+ val tuple = tupleIterator.next()
+ rowIndex += 1
+ processedCount += 1
+ val jsonTuple =
ExecutionResultService.convertTuplesToJson(List(tuple)).head
+ val truncatedTuple = truncateSingleTuple(jsonTuple,
maxOperatorResultCellCharLimit)
+ truncatedTuple.put("__row_index__", rowIndex)
+ val tupleSize = estimateTupleSize(truncatedTuple, mapper)
+
+ if (frontSize + tupleSize <= halfLimit) {
+ frontTuples += truncatedTuple
+ frontSize += tupleSize
+ } else {
+ // Front is full — switch to a sliding window for the back half.
+ val backBuffer = mutable.ArrayBuffer[(ObjectNode, Int)]()
+ backBuffer += ((truncatedTuple, tupleSize))
+ var backSize = tupleSize
+
+ while (tupleIterator.hasNext) {
+ val t = tupleIterator.next()
+ rowIndex += 1
+ processedCount += 1
+ val jt =
ExecutionResultService.convertTuplesToJson(List(t)).head
+ val tt = truncateSingleTuple(jt,
maxOperatorResultCellCharLimit)
+ tt.put("__row_index__", rowIndex)
+ val ts = estimateTupleSize(tt, mapper)
+
+ backBuffer += ((tt, ts))
+ backSize += ts
+
+ while (backSize > halfLimit - truncationNoticeSize &&
backBuffer.size > 1) {
+ val (_, removedSize) = backBuffer.remove(0)
+ backSize -= removedSize
+ }
+ }
+
+ val backTuples = backBuffer.map(_._1).toList
+ val allTuples = frontTuples.toList ++ backTuples
+ val skippedRows = totalCount - allTuples.size
+
+ return (
+ "table",
+ Some(allTuples.asJava),
+ Some(totalCount),
+ Some(allTuples.size),
+ Some(skippedRows > 0)
+ )
+ }
+ }
+
+ if (tupleIterator.hasNext) {
+ val backBuffer = mutable.ArrayBuffer[(ObjectNode, Int)]()
+ var backSize = 0
+
+ while (tupleIterator.hasNext) {
+ val t = tupleIterator.next()
+ rowIndex += 1
+ processedCount += 1
+ val jt = ExecutionResultService.convertTuplesToJson(List(t)).head
+ val tt = truncateSingleTuple(jt, maxOperatorResultCellCharLimit)
+ tt.put("__row_index__", rowIndex)
+ val ts = estimateTupleSize(tt, mapper)
+
+ backBuffer += ((tt, ts))
+ backSize += ts
+
+ while (backSize > halfLimit - truncationNoticeSize &&
backBuffer.size > 1) {
+ val (_, removedSize) = backBuffer.remove(0)
+ backSize -= removedSize
+ }
+ }
+
+ val backTuples = backBuffer.map(_._1).toList
+ val allTuples = frontTuples.toList ++ backTuples
+ val skippedRows = totalCount - allTuples.size
+
+ (
+ "table",
+ Some(allTuples.asJava),
+ Some(totalCount),
+ Some(allTuples.size),
+ Some(skippedRows > 0)
+ )
+ } else {
+ (
+ "table",
+ Some(frontTuples.toList.asJava),
+ Some(totalCount),
+ Some(frontTuples.size),
+ Some(false)
+ )
+ }
+
+ case None =>
+ ("table", None, None, None, None)
+ }
+ } catch {
+ case e: Exception =>
+ logger.warn(s"Error collecting result for operator $opId:
${e.getMessage}", e)
+ ("table", None, None, None, None)
+ }
+ }
+
+ private def truncateSingleTuple(
+ tuple: ObjectNode,
+ maxCellChars: Int
+ ): ObjectNode = {
+ import com.fasterxml.jackson.databind.ObjectMapper
+ import com.fasterxml.jackson.databind.node.TextNode
+
+ val mapper = new ObjectMapper()
+ val truncatedTuple = mapper.createObjectNode()
+ val fieldNames = tuple.fieldNames()
+
+ while (fieldNames.hasNext) {
+ val fieldName = fieldNames.next()
+ val fieldValue = tuple.get(fieldName)
+ if (fieldValue.isTextual) {
+ val text = fieldValue.asText()
+ if (text.length > maxCellChars) {
+ val truncatedText = symmetricTruncateCellValue(text, maxCellChars)
+ truncatedTuple.set(fieldName, new TextNode(truncatedText))
+ } else {
+ truncatedTuple.set(fieldName, fieldValue)
+ }
+ } else {
+ truncatedTuple.set(fieldName, fieldValue)
+ }
+ }
+ truncatedTuple
+ }
+
+ private def estimateTupleSize(
+ tuple: ObjectNode,
+ mapper: ObjectMapper
+ ): Int = {
+ mapper.writeValueAsString(tuple).length + 1 // +1 for the array separator
+ }
+
+ private def symmetricTruncateCellValue(text: String, maxChars: Int): String
= {
+ if (text.length <= maxChars) {
+ text
+ } else {
+ val notice = "...[truncated]..."
+ val availableChars = maxChars - notice.length
+ if (availableChars <= 0) {
+ text.substring(0, maxChars)
+ } else {
+ val halfChars = availableChars / 2
+ text.substring(0, halfChars) + notice + text.substring(text.length -
halfChars)
+ }
+ }
+ }
+
+ private def isVisualizationTuple(tuple: Tuple): Boolean = {
+ try {
+ val schema = tuple.getSchema
+ val fieldNames = schema.getAttributes.map(_.getName)
+ fieldNames.exists(name => name == "html-content" || name ==
"json-content")
+ } catch {
+ case _: Exception => false
+ }
+ }
+
+ private def collectConsoleLogs(
+ executionId: ExecutionIdentity,
+ opId: String
+ ): Option[List[ConsoleMessageInfo]] = {
+ try {
+ val uriOption = getConsoleMessageUri(executionId, OperatorIdentity(opId))
+
+ uriOption.flatMap { uri =>
+ val document = DocumentFactory
+ .openDocument(uri)
+ ._1
+ .asInstanceOf[VirtualDocument[Tuple]]
+
+ val messages = document.get().toList.flatMap { tuple =>
+ try {
+ val protoString = tuple.getField[String](0)
+ val msg = ConsoleMessage.fromAscii(protoString)
+ Some(
+ ConsoleMessageInfo(
+ msgType = msg.msgType.name,
+ title = msg.title,
+ message = msg.message
+ )
+ )
+ } catch {
+ case _: Exception => None
+ }
+ }
+
+ if (messages.nonEmpty) Some(messages) else None
+ }
+ } catch {
+ case _: Exception => None
+ }
+ }
+
+ private def getConsoleMessageUri(
+ eid: ExecutionIdentity,
+ opId: OperatorIdentity
+ ): Option[URI] = {
+ val context = SqlServer.getInstance().createDSLContext()
+ Option(
+ context
+ .select(OPERATOR_EXECUTIONS.CONSOLE_MESSAGES_URI)
+ .from(OPERATOR_EXECUTIONS)
+ .where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
+ .and(OPERATOR_EXECUTIONS.OPERATOR_ID.eq(opId.id))
+ .fetchOneInto(classOf[String])
+ ).filter(uri => uri != null && uri.nonEmpty)
+ .map(s => URI.create(s))
+ }
+
+ private def isTerminalState(state: WorkflowAggregatedState): Boolean = {
+ state match {
+ case COMPLETED | FAILED | KILLED | TERMINATED => true
+ case _ => false
+ }
+ }
+
+ private def hasConsoleError(consoleState: ExecutionConsoleStore): Boolean = {
+ consoleState.operatorConsole.values.exists { opConsole =>
+ opConsole.consoleMessages.exists(_.msgType == ConsoleMessageType.ERROR)
+ }
+ }
+
+ private def stateToString(state: WorkflowAggregatedState): String = {
+ state match {
+ case UNINITIALIZED => "Uninitialized"
+ case READY => "Ready"
+ case RUNNING => "Running"
+ case PAUSING => "Pausing"
+ case PAUSED => "Paused"
+ case RESUMING => "Resuming"
+ case COMPLETED => "Completed"
+ case FAILED => "Failed"
+ case KILLED => "Killed"
+ case TERMINATED => "Terminated"
+ case _ => "Unknown"
+ }
+ }
+
+ private def computeSubDAGIfNeeded(
+ logicalPlan: LogicalPlanPojo,
+ targetOperatorIds: List[String]
+ ): LogicalPlanPojo = {
+ if (targetOperatorIds.length != 1) {
+ return logicalPlan
+ }
+
+ val targetOpId = targetOperatorIds.head
+ val operatorMap: Map[String, LogicalOp] =
+ logicalPlan.operators.map(op => op.operatorIdentifier.id -> op).toMap
+
+ if (!operatorMap.contains(targetOpId)) {
+ logger.warn(s"Target operator $targetOpId not found in logical plan,
using full DAG")
+ return logicalPlan
+ }
+
+ val incomingLinks: Map[String, List[LogicalLink]] =
+ logicalPlan.links.groupBy(_.toOpId.id)
+
+ val visited = mutable.Set[String]()
+ val subDagOperators = mutable.ListBuffer[LogicalOp]()
+ val subDagLinks = mutable.ListBuffer[LogicalLink]()
+
+ def dfs(currentOpId: String): Unit = {
+ if (visited.contains(currentOpId)) return
+ visited.add(currentOpId)
+
+ operatorMap.get(currentOpId).foreach { op =>
+ subDagOperators += op
+ incomingLinks.getOrElse(currentOpId, List.empty).foreach { link =>
+ subDagLinks += link
+ dfs(link.fromOpId.id)
+ }
+ }
+ }
+
+ dfs(targetOpId)
+
+ LogicalPlanPojo(
+ operators = subDagOperators.toList,
+ links = subDagLinks.toList,
+ opsToViewResult = targetOperatorIds.filter(id => visited.contains(id)),
+ opsToReuseResult = logicalPlan.opsToReuseResult.filter(id =>
visited.contains(id))
+ )
+ }
+
+ // Returns operator-id -> error message; empty map means compilation
succeeded.
+ private def validateWorkflow(
+ workflowId: Long,
+ logicalPlan: LogicalPlanPojo
+ ): Map[String, String] = {
+ try {
+ val tempContext = new WorkflowContext(WorkflowIdentity(workflowId))
+ val compiler = new WorkflowCompiler(tempContext)
+ compiler.compile(logicalPlan)
+ Map.empty
+ } catch {
+ case e: Exception =>
+ val errorMsg = Option(e.getMessage).getOrElse("Compilation failed")
+ val operatorIdPattern = """operator[- ]?(\S+)""".r
+ val operatorId = operatorIdPattern
+ .findFirstMatchIn(errorMsg.toLowerCase)
+ .map(_.group(1))
+ .getOrElse("workflow")
+ Map(operatorId -> errorMsg)
+ }
+ }
+
+ @GET
+ @Path("/health")
+ def healthCheck: Map[String, String] = Map("status" -> "ok")
+}