This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-5294-a8548128615ca5e9402384bf0752328c7e39ab12
in repository https://gitbox.apache.org/repos/asf/texera.git

commit a1045ce55e9169deb94f8ae3bf195692ef994ec5
Author: Kunwoo (Chris) <[email protected]>
AuthorDate: Sat May 30 12:31:06 2026 -0700

    fix(frontend): drop stale attribute references when an operator's input 
schema changes (#5294)
    
    ### What changes were proposed in this PR?
    
    Fixes a bug where copy-pasting a schema-propagated Aggregate operator
    and connecting it to a different upstream operator caused a compile
    error that persisted even after clearing all properties.
    
    **Why it is a bug**: An Aggregate stores property values that reference
    input column names — `groupByKeys` and `aggregations[].attribute`.
    Copy-paste clones `operatorProperties`, so the pasted operator keeps
    references to the *old* source's columns. When it is wired to a
    different operator, the backend `AggregateOpDesc` schema propagation
    calls `inputSchema.getAttribute("<old col>")`, which throws. These old
    column values were never removed.
    `DynamicSchemaService.setDynamicSchema` is documented to drop properties
    invalidated by a new schema, but the code never actually did it.
    
    **What changed:** When schema propagation produces an input-attribute
    list, `WorkflowCompilingService` now drops property values that the new
    schema invalidates via a new pure helper `dropInvalidAttributeValues`:
    - `autofill: "attributeName"` -> reset to `""` if not in the enum
    - `autofill: "attributeNameList"` -> filter out entries not in the enum
    - recurses through nested objects/arrays (so `aggregations[].attribute`
    is handled) and never mutates the input
    
    The misleading doc comment on `setDynamicSchema` now points to where the
    contract is fulfilled.
    
    ### Any related issues, documentation, discussions?
    
    Resolves #3070.
    
    ### How was this PR tested?
    Manually tested:
    
    
    
https://github.com/user-attachments/assets/d46e6198-10de-4efe-8c34-71981374cde8
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Claude Opus 4.8)
---
 .../workflow-compiling.service.spec.ts             | 242 +++++++++++++++++++++
 .../compile-workflow/workflow-compiling.service.ts |  83 +++++++
 .../dynamic-schema/dynamic-schema.service.ts       |   6 +-
 3 files changed, 328 insertions(+), 3 deletions(-)

diff --git 
a/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.spec.ts
 
b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.spec.ts
new file mode 100644
index 0000000000..b456205f22
--- /dev/null
+++ 
b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.spec.ts
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { JSONSchema7Definition } from "json-schema";
+import { TestBed } from "@angular/core/testing";
+import { WorkflowCompilingService } from "./workflow-compiling.service";
+import { WorkflowActionService } from 
"../workflow-graph/model/workflow-action.service";
+import { DynamicSchemaService } from 
"../dynamic-schema/dynamic-schema.service";
+import { ValidationWorkflowService } from 
"../validation/validation-workflow.service";
+import { OperatorMetadataService } from 
"../operator-metadata/operator-metadata.service";
+import { StubOperatorMetadataService } from 
"../operator-metadata/stub-operator-metadata.service";
+import { JointUIService } from "../joint-ui/joint-ui.service";
+import { WorkflowUtilService } from 
"../workflow-graph/util/workflow-util.service";
+import { UndoRedoService } from "../undo-redo/undo-redo.service";
+import { mockPoint, mockScanPredicate } from 
"../workflow-graph/model/mock-workflow-data";
+import { serializePortIdentity } from 
"../../../common/util/port-identity-serde";
+import { commonTestImports, commonTestProviders } from 
"../../../common/testing/test-utils";
+
+describe("WorkflowCompilingService.dropInvalidAttributeValues", () => {
+  // A schema shaped like the Aggregate operator after schema propagation has 
filled in the
+  // valid input attribute names ("col_y" is the only attribute available on 
the new input).
+  const aggregateSchema = (): JSONSchema7Definition =>
+    ({
+      type: "object",
+      properties: {
+        groupByKeys: {
+          type: "array",
+          autofill: "attributeNameList",
+          items: { type: "string", enum: ["col_y", ""] },
+        },
+        aggregations: {
+          type: "array",
+          items: {
+            type: "object",
+            properties: {
+              attribute: { type: "string", autofill: "attributeName", enum: 
["col_y"] },
+              aggFunction: { type: "string" },
+              resultAttribute: { type: "string" },
+            },
+          },
+        },
+      },
+    }) as unknown as JSONSchema7Definition;
+
+  it("drops list entries and resets single attributes that are no longer 
valid", () => {
+    const properties = {
+      groupByKeys: ["col_x", "col_y"],
+      aggregations: [{ attribute: "col_x", aggFunction: "sum", 
resultAttribute: "r" }],
+    };
+
+    const { value, changed } = 
WorkflowCompilingService.dropInvalidAttributeValues(aggregateSchema(), 
properties);
+
+    expect(changed).toBe(true);
+    expect(value.groupByKeys).toEqual(["col_y"]);
+    expect(value.aggregations[0].attribute).toBe("");
+    // non-attribute fields are preserved
+    expect(value.aggregations[0].aggFunction).toBe("sum");
+    expect(value.aggregations[0].resultAttribute).toBe("r");
+    // the input object is never mutated
+    expect(properties.groupByKeys).toEqual(["col_x", "col_y"]);
+    expect(properties.aggregations[0].attribute).toBe("col_x");
+  });
+
+  it("reports no change when all attribute references are valid", () => {
+    const properties = {
+      groupByKeys: ["col_y"],
+      aggregations: [{ attribute: "col_y", aggFunction: "sum", 
resultAttribute: "r" }],
+    };
+
+    const { value, changed } = 
WorkflowCompilingService.dropInvalidAttributeValues(aggregateSchema(), 
properties);
+
+    expect(changed).toBe(false);
+    expect(value).toBe(properties);
+  });
+
+  it("makes no change when the input schema (enum) is unknown", () => {
+    const schemaWithoutEnum: JSONSchema7Definition = {
+      type: "object",
+      properties: {
+        groupByKeys: {
+          type: "array",
+          autofill: "attributeNameList",
+          items: { type: "string" },
+        },
+        aggregations: {
+          type: "array",
+          items: {
+            type: "object",
+            properties: {
+              attribute: { type: "string", autofill: "attributeName" },
+            },
+          },
+        },
+      },
+    } as unknown as JSONSchema7Definition;
+
+    const properties = {
+      groupByKeys: ["col_x"],
+      aggregations: [{ attribute: "col_x" }],
+    };
+
+    const { value, changed } = 
WorkflowCompilingService.dropInvalidAttributeValues(schemaWithoutEnum, 
properties);
+
+    expect(changed).toBe(false);
+    expect(value).toBe(properties);
+  });
+
+  it("returns the value unchanged for non-object schemas or nullish values", 
() => {
+    // boolean schema (e.g. `additionalProperties: true`)
+    expect(WorkflowCompilingService.dropInvalidAttributeValues(true, { a: 1 
})).toEqual({
+      value: { a: 1 },
+      changed: false,
+    });
+    // null / undefined values are not walked
+    
expect(WorkflowCompilingService.dropInvalidAttributeValues(aggregateSchema(), 
null)).toEqual({
+      value: null,
+      changed: false,
+    });
+    
expect(WorkflowCompilingService.dropInvalidAttributeValues(aggregateSchema(), 
undefined)).toEqual({
+      value: undefined,
+      changed: false,
+    });
+  });
+
+  it("skips schema properties that are absent from the value object", () => {
+    // the value is missing both `groupByKeys` and `aggregations` defined in 
the schema
+    const properties = { unrelated: "keep-me" };
+
+    const { value, changed } = 
WorkflowCompilingService.dropInvalidAttributeValues(aggregateSchema(), 
properties);
+
+    expect(changed).toBe(false);
+    expect(value).toBe(properties);
+  });
+});
+
+describe("WorkflowCompilingService schema propagation property cleanup", () => 
{
+  let service: WorkflowCompilingService;
+  let workflowActionService: WorkflowActionService;
+  let dynamicSchemaService: DynamicSchemaService;
+
+  beforeEach(() => {
+    TestBed.configureTestingModule({
+      imports: [...commonTestImports],
+      providers: [
+        { provide: OperatorMetadataService, useClass: 
StubOperatorMetadataService },
+        JointUIService,
+        WorkflowActionService,
+        WorkflowUtilService,
+        UndoRedoService,
+        DynamicSchemaService,
+        ValidationWorkflowService,
+        WorkflowCompilingService,
+        ...commonTestProviders,
+      ],
+    });
+    service = TestBed.inject(WorkflowCompilingService);
+    workflowActionService = TestBed.inject(WorkflowActionService);
+    dynamicSchemaService = TestBed.inject(DynamicSchemaService);
+  });
+
+  it("drops operator property values that the propagated input schema no 
longer supports", () => {
+    const operatorID = mockScanPredicate.operatorID;
+    workflowActionService.addOperator(mockScanPredicate, mockPoint);
+
+    // give the operator a schema with attribute-autofill properties bound to 
input port 0
+    const baseSchema = dynamicSchemaService.getDynamicSchema(operatorID);
+    dynamicSchemaService.setDynamicSchema(operatorID, {
+      ...baseSchema,
+      jsonSchema: {
+        type: "object",
+        properties: {
+          groupByKeys: {
+            type: "array",
+            autofill: "attributeNameList",
+            autofillAttributeOnPort: 0,
+            items: { type: "string" },
+          },
+          attribute: { type: "string", autofill: "attributeName", 
autofillAttributeOnPort: 0 },
+        },
+      } as any,
+    });
+
+    // stale references to "col_x", a column that does not exist on the new 
input
+    workflowActionService.setOperatorProperty(operatorID, { groupByKeys: 
["col_x", "col_y"], attribute: "col_x" });
+
+    // the propagated input schema only contains "col_y"
+    vi.spyOn(service, "getOperatorInputSchemaMap").mockReturnValue({
+      [serializePortIdentity({ id: 0, internal: false })]: [{ attributeName: 
"col_y", attributeType: "string" }],
+    } as any);
+
+    // invoke the private propagation handler directly (normally triggered by 
a compile response)
+    (service as any).applySchemaPropagationResult();
+
+    const cleaned = 
workflowActionService.getTexeraGraph().getOperator(operatorID).operatorProperties;
+    expect(cleaned.groupByKeys).toEqual(["col_y"]);
+    expect(cleaned.attribute).toBe("");
+  });
+
+  it("leaves valid property values untouched", () => {
+    const operatorID = mockScanPredicate.operatorID;
+    workflowActionService.addOperator(mockScanPredicate, mockPoint);
+
+    const baseSchema = dynamicSchemaService.getDynamicSchema(operatorID);
+    dynamicSchemaService.setDynamicSchema(operatorID, {
+      ...baseSchema,
+      jsonSchema: {
+        type: "object",
+        properties: {
+          attribute: { type: "string", autofill: "attributeName", 
autofillAttributeOnPort: 0 },
+        },
+      } as any,
+    });
+
+    workflowActionService.setOperatorProperty(operatorID, { attribute: "col_y" 
});
+
+    vi.spyOn(service, "getOperatorInputSchemaMap").mockReturnValue({
+      [serializePortIdentity({ id: 0, internal: false })]: [{ attributeName: 
"col_y", attributeType: "string" }],
+    } as any);
+
+    const setSpy = vi.spyOn(workflowActionService, "setOperatorProperty");
+    (service as any).applySchemaPropagationResult();
+
+    expect(setSpy).not.toHaveBeenCalled();
+    
expect(workflowActionService.getTexeraGraph().getOperator(operatorID).operatorProperties.attribute).toBe("col_y");
+  });
+});
diff --git 
a/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts
 
b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts
index 9648d2b305..dae47a0088 100644
--- 
a/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts
+++ 
b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts
@@ -19,6 +19,7 @@
 
 import { HttpClient, HttpHeaders } from "@angular/common/http";
 import { Injectable } from "@angular/core";
+import { JSONSchema7Definition } from "json-schema";
 import { EMPTY, merge, Observable, ReplaySubject } from "rxjs";
 import { CustomJSONSchema7 } from 
"src/app/workspace/types/custom-json-schema.interface";
 import { AppSettings } from "../../../common/app-setting";
@@ -190,6 +191,19 @@ export class WorkflowCompilingService {
       let newDynamicSchema: OperatorSchema;
       if (inputSchema) {
         newDynamicSchema = 
WorkflowCompilingService.setOperatorInputAttrs(currentDynamicSchema, 
inputSchema);
+
+        // Now that the list of input attributes is known, drop any operator 
property
+        // values that reference attributes which no longer exist in the input 
schema (e.g. a copy-pasted
+        // operator wired to a different upstream, or an operator re-wired to 
a new source). Otherwise
+        // these old references cause a compile error that survives even after 
clearing properties.
+        const operator = 
this.workflowActionService.getTexeraGraph().getOperator(operatorID);
+        const { value: cleanedProperties, changed } = 
WorkflowCompilingService.dropInvalidAttributeValues(
+          newDynamicSchema.jsonSchema,
+          operator.operatorProperties
+        );
+        if (changed) {
+          this.workflowActionService.setOperatorProperty(operatorID, 
cleanedProperties);
+        }
       } else {
         // otherwise, the input attributes of the operator is unknown
         // if the operator is not a source operator, restore its original 
schema of input attributes
@@ -392,6 +406,75 @@ export class WorkflowCompilingService {
     };
   }
 
+  /**
+   * Walks an operator's property values with its json schema and drops any
+   * value that references an input attribute which is no longer valid.
+   *
+   * Only properties marked with an `autofill` annotation are affected, and 
only when the schema carries an
+   * `enum` of valid attribute names (i.e. the input schema is known). Two 
cases are handled:
+   *  - `attributeName`: a single column name. Reset to "" if it's not in the 
enum.
+   *  - `attributeNameList`: a list of column names. Filter out entries that 
aren't in the enum.
+   *
+   * Returns the (possibly new) properties object and whether anything changed.
+   */
+  public static dropInvalidAttributeValues(
+    schema: JSONSchema7Definition | undefined,
+    value: any
+  ): { value: any; changed: boolean } {
+    if (typeof schema !== "object" || schema === null || value === undefined 
|| value === null) {
+      return { value, changed: false };
+    }
+    const s = schema as CustomJSONSchema7;
+
+    if (s.autofill === "attributeNameList") {
+      const itemEnum = (s.items as CustomJSONSchema7 | undefined)?.enum;
+      if (Array.isArray(value) && Array.isArray(itemEnum)) {
+        const filtered = value.filter(v => itemEnum.includes(v));
+        return { value: filtered, changed: filtered.length !== value.length };
+      }
+      return { value, changed: false };
+    }
+
+    if (s.autofill === "attributeName") {
+      if (Array.isArray(s.enum) && typeof value === "string" && 
!s.enum.includes(value)) {
+        return { value: "", changed: true };
+      }
+      return { value, changed: false };
+    }
+
+    // recurse into object properties
+    if (s.properties && typeof value === "object" && !Array.isArray(value)) {
+      let changed = false;
+      const newValue = { ...value };
+      Object.entries(s.properties).forEach(([key, propSchema]) => {
+        if (key in newValue) {
+          const res = 
WorkflowCompilingService.dropInvalidAttributeValues(propSchema, newValue[key]);
+          if (res.changed) {
+            newValue[key] = res.value;
+            changed = true;
+          }
+        }
+      });
+      return { value: changed ? newValue : value, changed };
+    }
+
+    // recurse into array items (only when items is a single schema, not a 
tuple schema)
+    if (s.items && !Array.isArray(s.items) && Array.isArray(value)) {
+      let changed = false;
+      const newArr = value.map(item => {
+        const res = 
WorkflowCompilingService.dropInvalidAttributeValues(s.items as 
JSONSchema7Definition, item);
+        if (res.changed) {
+          changed = true;
+          return res.value;
+        }
+        return item;
+      });
+      return { value: changed ? newArr : value, changed };
+    }
+
+    return { value, changed: false };
+  }
+
   public static restoreOperatorInputAttrs(operatorSchema: OperatorSchema): 
OperatorSchema {
     let newJsonSchema = operatorSchema.jsonSchema;
 
diff --git 
a/frontend/src/app/workspace/service/dynamic-schema/dynamic-schema.service.ts 
b/frontend/src/app/workspace/service/dynamic-schema/dynamic-schema.service.ts
index e2a7592772..50c33e7e86 100644
--- 
a/frontend/src/app/workspace/service/dynamic-schema/dynamic-schema.service.ts
+++ 
b/frontend/src/app/workspace/service/dynamic-schema/dynamic-schema.service.ts
@@ -120,9 +120,9 @@ export class DynamicSchemaService {
   /**
    * Sets the dynamic schema of an operator. If the new schema is different, 
also emit dynamic schema changed event.
    *
-   * The new dynamic schema is validated against the current operator 
properties.
-   * If the changed new dynamic schema invalidates some property, then the 
invalid properties fields will be dropped.
-   *
+   * Note: dropping operator property values that the new schema invalidates 
(e.g. attribute references that no
+   * longer exist after schema propagation) is handled by 
WorkflowCompilingService.dropInvalidAttributeValues,
+   * which has access to the propagated input attributes.
    */
   public setDynamicSchema(operatorID: string, dynamicSchema: OperatorSchema): 
void {
     const currentDynamicSchema = this.dynamicSchemaMap.get(operatorID);

Reply via email to