This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5294-a8548128615ca5e9402384bf0752328c7e39ab12 in repository https://gitbox.apache.org/repos/asf/texera.git
commit a1045ce55e9169deb94f8ae3bf195692ef994ec5 Author: Kunwoo (Chris) <[email protected]> AuthorDate: Sat May 30 12:31:06 2026 -0700 fix(frontend): drop stale attribute references when an operator's input schema changes (#5294) ### What changes were proposed in this PR? Fixes a bug where copy-pasting a schema-propagated Aggregate operator and connecting it to a different upstream operator caused a compile error that persisted even after clearing all properties. **Why it is a bug**: An Aggregate stores property values that reference input column names — `groupByKeys` and `aggregations[].attribute`. Copy-paste clones `operatorProperties`, so the pasted operator keeps references to the *old* source's columns. When it is wired to a different operator, the backend `AggregateOpDesc` schema propagation calls `inputSchema.getAttribute("<old col>")`, which throws. These old column values were never removed. `DynamicSchemaService.setDynamicSchema` is documented to drop properties invalidated by a new schema, but the code never actually did it. **What changed:** When schema propagation produces an input-attribute list, `WorkflowCompilingService` now drops property values that the new schema invalidates via a new pure helper `dropInvalidAttributeValues`: - `autofill: "attributeName"` -> reset to `""` if not in the enum - `autofill: "attributeNameList"` -> filter out entries not in the enum - recurses through nested objects/arrays (so `aggregations[].attribute` is handled) and never mutates the input The misleading doc comment on `setDynamicSchema` now points to where the contract is fulfilled. ### Any related issues, documentation, discussions? Resolves #3070. ### How was this PR tested? Manually tested: https://github.com/user-attachments/assets/d46e6198-10de-4efe-8c34-71981374cde8 ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Claude Opus 4.8) --- .../workflow-compiling.service.spec.ts | 242 +++++++++++++++++++++ .../compile-workflow/workflow-compiling.service.ts | 83 +++++++ .../dynamic-schema/dynamic-schema.service.ts | 6 +- 3 files changed, 328 insertions(+), 3 deletions(-) diff --git a/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.spec.ts b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.spec.ts new file mode 100644 index 0000000000..b456205f22 --- /dev/null +++ b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.spec.ts @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { JSONSchema7Definition } from "json-schema"; +import { TestBed } from "@angular/core/testing"; +import { WorkflowCompilingService } from "./workflow-compiling.service"; +import { WorkflowActionService } from "../workflow-graph/model/workflow-action.service"; +import { DynamicSchemaService } from "../dynamic-schema/dynamic-schema.service"; +import { ValidationWorkflowService } from "../validation/validation-workflow.service"; +import { OperatorMetadataService } from "../operator-metadata/operator-metadata.service"; +import { StubOperatorMetadataService } from "../operator-metadata/stub-operator-metadata.service"; +import { JointUIService } from "../joint-ui/joint-ui.service"; +import { WorkflowUtilService } from "../workflow-graph/util/workflow-util.service"; +import { UndoRedoService } from "../undo-redo/undo-redo.service"; +import { mockPoint, mockScanPredicate } from "../workflow-graph/model/mock-workflow-data"; +import { serializePortIdentity } from "../../../common/util/port-identity-serde"; +import { commonTestImports, commonTestProviders } from "../../../common/testing/test-utils"; + +describe("WorkflowCompilingService.dropInvalidAttributeValues", () => { + // A schema shaped like the Aggregate operator after schema propagation has filled in the + // valid input attribute names ("col_y" is the only attribute available on the new input). + const aggregateSchema = (): JSONSchema7Definition => + ({ + type: "object", + properties: { + groupByKeys: { + type: "array", + autofill: "attributeNameList", + items: { type: "string", enum: ["col_y", ""] }, + }, + aggregations: { + type: "array", + items: { + type: "object", + properties: { + attribute: { type: "string", autofill: "attributeName", enum: ["col_y"] }, + aggFunction: { type: "string" }, + resultAttribute: { type: "string" }, + }, + }, + }, + }, + }) as unknown as JSONSchema7Definition; + + it("drops list entries and resets single attributes that are no longer valid", () => { + const properties = { + groupByKeys: ["col_x", "col_y"], + aggregations: [{ attribute: "col_x", aggFunction: "sum", resultAttribute: "r" }], + }; + + const { value, changed } = WorkflowCompilingService.dropInvalidAttributeValues(aggregateSchema(), properties); + + expect(changed).toBe(true); + expect(value.groupByKeys).toEqual(["col_y"]); + expect(value.aggregations[0].attribute).toBe(""); + // non-attribute fields are preserved + expect(value.aggregations[0].aggFunction).toBe("sum"); + expect(value.aggregations[0].resultAttribute).toBe("r"); + // the input object is never mutated + expect(properties.groupByKeys).toEqual(["col_x", "col_y"]); + expect(properties.aggregations[0].attribute).toBe("col_x"); + }); + + it("reports no change when all attribute references are valid", () => { + const properties = { + groupByKeys: ["col_y"], + aggregations: [{ attribute: "col_y", aggFunction: "sum", resultAttribute: "r" }], + }; + + const { value, changed } = WorkflowCompilingService.dropInvalidAttributeValues(aggregateSchema(), properties); + + expect(changed).toBe(false); + expect(value).toBe(properties); + }); + + it("makes no change when the input schema (enum) is unknown", () => { + const schemaWithoutEnum: JSONSchema7Definition = { + type: "object", + properties: { + groupByKeys: { + type: "array", + autofill: "attributeNameList", + items: { type: "string" }, + }, + aggregations: { + type: "array", + items: { + type: "object", + properties: { + attribute: { type: "string", autofill: "attributeName" }, + }, + }, + }, + }, + } as unknown as JSONSchema7Definition; + + const properties = { + groupByKeys: ["col_x"], + aggregations: [{ attribute: "col_x" }], + }; + + const { value, changed } = WorkflowCompilingService.dropInvalidAttributeValues(schemaWithoutEnum, properties); + + expect(changed).toBe(false); + expect(value).toBe(properties); + }); + + it("returns the value unchanged for non-object schemas or nullish values", () => { + // boolean schema (e.g. `additionalProperties: true`) + expect(WorkflowCompilingService.dropInvalidAttributeValues(true, { a: 1 })).toEqual({ + value: { a: 1 }, + changed: false, + }); + // null / undefined values are not walked + expect(WorkflowCompilingService.dropInvalidAttributeValues(aggregateSchema(), null)).toEqual({ + value: null, + changed: false, + }); + expect(WorkflowCompilingService.dropInvalidAttributeValues(aggregateSchema(), undefined)).toEqual({ + value: undefined, + changed: false, + }); + }); + + it("skips schema properties that are absent from the value object", () => { + // the value is missing both `groupByKeys` and `aggregations` defined in the schema + const properties = { unrelated: "keep-me" }; + + const { value, changed } = WorkflowCompilingService.dropInvalidAttributeValues(aggregateSchema(), properties); + + expect(changed).toBe(false); + expect(value).toBe(properties); + }); +}); + +describe("WorkflowCompilingService schema propagation property cleanup", () => { + let service: WorkflowCompilingService; + let workflowActionService: WorkflowActionService; + let dynamicSchemaService: DynamicSchemaService; + + beforeEach(() => { + TestBed.configureTestingModule({ + imports: [...commonTestImports], + providers: [ + { provide: OperatorMetadataService, useClass: StubOperatorMetadataService }, + JointUIService, + WorkflowActionService, + WorkflowUtilService, + UndoRedoService, + DynamicSchemaService, + ValidationWorkflowService, + WorkflowCompilingService, + ...commonTestProviders, + ], + }); + service = TestBed.inject(WorkflowCompilingService); + workflowActionService = TestBed.inject(WorkflowActionService); + dynamicSchemaService = TestBed.inject(DynamicSchemaService); + }); + + it("drops operator property values that the propagated input schema no longer supports", () => { + const operatorID = mockScanPredicate.operatorID; + workflowActionService.addOperator(mockScanPredicate, mockPoint); + + // give the operator a schema with attribute-autofill properties bound to input port 0 + const baseSchema = dynamicSchemaService.getDynamicSchema(operatorID); + dynamicSchemaService.setDynamicSchema(operatorID, { + ...baseSchema, + jsonSchema: { + type: "object", + properties: { + groupByKeys: { + type: "array", + autofill: "attributeNameList", + autofillAttributeOnPort: 0, + items: { type: "string" }, + }, + attribute: { type: "string", autofill: "attributeName", autofillAttributeOnPort: 0 }, + }, + } as any, + }); + + // stale references to "col_x", a column that does not exist on the new input + workflowActionService.setOperatorProperty(operatorID, { groupByKeys: ["col_x", "col_y"], attribute: "col_x" }); + + // the propagated input schema only contains "col_y" + vi.spyOn(service, "getOperatorInputSchemaMap").mockReturnValue({ + [serializePortIdentity({ id: 0, internal: false })]: [{ attributeName: "col_y", attributeType: "string" }], + } as any); + + // invoke the private propagation handler directly (normally triggered by a compile response) + (service as any).applySchemaPropagationResult(); + + const cleaned = workflowActionService.getTexeraGraph().getOperator(operatorID).operatorProperties; + expect(cleaned.groupByKeys).toEqual(["col_y"]); + expect(cleaned.attribute).toBe(""); + }); + + it("leaves valid property values untouched", () => { + const operatorID = mockScanPredicate.operatorID; + workflowActionService.addOperator(mockScanPredicate, mockPoint); + + const baseSchema = dynamicSchemaService.getDynamicSchema(operatorID); + dynamicSchemaService.setDynamicSchema(operatorID, { + ...baseSchema, + jsonSchema: { + type: "object", + properties: { + attribute: { type: "string", autofill: "attributeName", autofillAttributeOnPort: 0 }, + }, + } as any, + }); + + workflowActionService.setOperatorProperty(operatorID, { attribute: "col_y" }); + + vi.spyOn(service, "getOperatorInputSchemaMap").mockReturnValue({ + [serializePortIdentity({ id: 0, internal: false })]: [{ attributeName: "col_y", attributeType: "string" }], + } as any); + + const setSpy = vi.spyOn(workflowActionService, "setOperatorProperty"); + (service as any).applySchemaPropagationResult(); + + expect(setSpy).not.toHaveBeenCalled(); + expect(workflowActionService.getTexeraGraph().getOperator(operatorID).operatorProperties.attribute).toBe("col_y"); + }); +}); diff --git a/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts index 9648d2b305..dae47a0088 100644 --- a/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts +++ b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts @@ -19,6 +19,7 @@ import { HttpClient, HttpHeaders } from "@angular/common/http"; import { Injectable } from "@angular/core"; +import { JSONSchema7Definition } from "json-schema"; import { EMPTY, merge, Observable, ReplaySubject } from "rxjs"; import { CustomJSONSchema7 } from "src/app/workspace/types/custom-json-schema.interface"; import { AppSettings } from "../../../common/app-setting"; @@ -190,6 +191,19 @@ export class WorkflowCompilingService { let newDynamicSchema: OperatorSchema; if (inputSchema) { newDynamicSchema = WorkflowCompilingService.setOperatorInputAttrs(currentDynamicSchema, inputSchema); + + // Now that the list of input attributes is known, drop any operator property + // values that reference attributes which no longer exist in the input schema (e.g. a copy-pasted + // operator wired to a different upstream, or an operator re-wired to a new source). Otherwise + // these old references cause a compile error that survives even after clearing properties. + const operator = this.workflowActionService.getTexeraGraph().getOperator(operatorID); + const { value: cleanedProperties, changed } = WorkflowCompilingService.dropInvalidAttributeValues( + newDynamicSchema.jsonSchema, + operator.operatorProperties + ); + if (changed) { + this.workflowActionService.setOperatorProperty(operatorID, cleanedProperties); + } } else { // otherwise, the input attributes of the operator is unknown // if the operator is not a source operator, restore its original schema of input attributes @@ -392,6 +406,75 @@ export class WorkflowCompilingService { }; } + /** + * Walks an operator's property values with its json schema and drops any + * value that references an input attribute which is no longer valid. + * + * Only properties marked with an `autofill` annotation are affected, and only when the schema carries an + * `enum` of valid attribute names (i.e. the input schema is known). Two cases are handled: + * - `attributeName`: a single column name. Reset to "" if it's not in the enum. + * - `attributeNameList`: a list of column names. Filter out entries that aren't in the enum. + * + * Returns the (possibly new) properties object and whether anything changed. + */ + public static dropInvalidAttributeValues( + schema: JSONSchema7Definition | undefined, + value: any + ): { value: any; changed: boolean } { + if (typeof schema !== "object" || schema === null || value === undefined || value === null) { + return { value, changed: false }; + } + const s = schema as CustomJSONSchema7; + + if (s.autofill === "attributeNameList") { + const itemEnum = (s.items as CustomJSONSchema7 | undefined)?.enum; + if (Array.isArray(value) && Array.isArray(itemEnum)) { + const filtered = value.filter(v => itemEnum.includes(v)); + return { value: filtered, changed: filtered.length !== value.length }; + } + return { value, changed: false }; + } + + if (s.autofill === "attributeName") { + if (Array.isArray(s.enum) && typeof value === "string" && !s.enum.includes(value)) { + return { value: "", changed: true }; + } + return { value, changed: false }; + } + + // recurse into object properties + if (s.properties && typeof value === "object" && !Array.isArray(value)) { + let changed = false; + const newValue = { ...value }; + Object.entries(s.properties).forEach(([key, propSchema]) => { + if (key in newValue) { + const res = WorkflowCompilingService.dropInvalidAttributeValues(propSchema, newValue[key]); + if (res.changed) { + newValue[key] = res.value; + changed = true; + } + } + }); + return { value: changed ? newValue : value, changed }; + } + + // recurse into array items (only when items is a single schema, not a tuple schema) + if (s.items && !Array.isArray(s.items) && Array.isArray(value)) { + let changed = false; + const newArr = value.map(item => { + const res = WorkflowCompilingService.dropInvalidAttributeValues(s.items as JSONSchema7Definition, item); + if (res.changed) { + changed = true; + return res.value; + } + return item; + }); + return { value: changed ? newArr : value, changed }; + } + + return { value, changed: false }; + } + public static restoreOperatorInputAttrs(operatorSchema: OperatorSchema): OperatorSchema { let newJsonSchema = operatorSchema.jsonSchema; diff --git a/frontend/src/app/workspace/service/dynamic-schema/dynamic-schema.service.ts b/frontend/src/app/workspace/service/dynamic-schema/dynamic-schema.service.ts index e2a7592772..50c33e7e86 100644 --- a/frontend/src/app/workspace/service/dynamic-schema/dynamic-schema.service.ts +++ b/frontend/src/app/workspace/service/dynamic-schema/dynamic-schema.service.ts @@ -120,9 +120,9 @@ export class DynamicSchemaService { /** * Sets the dynamic schema of an operator. If the new schema is different, also emit dynamic schema changed event. * - * The new dynamic schema is validated against the current operator properties. - * If the changed new dynamic schema invalidates some property, then the invalid properties fields will be dropped. - * + * Note: dropping operator property values that the new schema invalidates (e.g. attribute references that no + * longer exist after schema propagation) is handled by WorkflowCompilingService.dropInvalidAttributeValues, + * which has access to the propagated input attributes. */ public setDynamicSchema(operatorID: string, dynamicSchema: OperatorSchema): void { const currentDynamicSchema = this.dynamicSchemaMap.get(operatorID);
