This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/texera.git
commit e3df0ceedcc6514bf2387bcdb8b29cdb51fa28e0 Author: Xinyuan Lin <[email protected]> AuthorDate: Sat Jun 20 20:58:41 2026 -0700 test(workflow-operator): add unit test coverage for control and aggregate operator descriptors (If, Dummy, Aggregate) (#5833) ### What changes were proposed in this PR? Pin behavior of three core non-source operator descriptors in `common/workflow-operator/`. No production-code changes. | Spec | Source class | Tests | | --- | --- | --- | | `IfOpDescSpec` | `IfOpDesc` | 4 | | `DummyOpDescSpec` | `DummyOpDesc` | 5 | | `AggregateOpDescSpec` | `AggregateOpDesc` | 4 | **Behavior pinned — `IfOpDesc`** | Surface | Contract | | --- | --- | | `operatorInfo` | `If`, CONTROL_GROUP; 2 inputs (`Condition` at `PortIdentity()`, data at `PortIdentity(1)`); 2 outputs `False`/`True` | | `conditionName` | defaults to `null` | | `getPhysicalOp` | wires `IfOpExec`, non-parallelizable, port identities carried | | Schema propagation | routes the **data** input's schema (`inputPorts.last`) to **both** outputs; the condition-port schema is dropped | **Behavior pinned — `DummyOpDesc`** | Surface | Contract | | --- | --- | | `operatorInfo` | `Dummy`, UTILITY_GROUP; all four `dynamicInputPorts`/`dynamicOutputPorts`/`supportReconfiguration`/`allowPortCustomization` flags `true` | | Port derivation | null port lists → single default `InputPort()`/`OutputPort()`; explicit `PortDescriptor` list → one port per element, indexed by position | | `dummyOperator` | defaults to `""` | | `getPhysicalOp` | the unimplemented `LogicalOp` stub → `NotImplementedError` | **Behavior pinned — `AggregateOpDesc`** | Surface | Contract | | --- | --- | | `operatorInfo` | `Aggregate`, AGGREGATE_GROUP, 1-in/1-out, `supportReconfiguration == false` | | `getPhysicalPlan` | a two-stage (`localAgg` + `globalAgg`) plan with one connecting link | | Schema propagation | group-by keys + per-aggregation result column; SUM keeps the input type, COUNT → INTEGER, AVERAGE → DOUBLE | **Note for reviewers:** each `AggregateOpDesc` test builds a **fresh** descriptor — `getPhysicalPlan` mutates `aggregations` via `getFinal` (e.g. COUNT → SUM) and is intentionally not idempotent. ### Any related issues, documentation, discussions? Closes #5830. ### How was this PR tested? - `sbt "WorkflowOperator/testOnly org.apache.texera.amber.operator.ifStatement.IfOpDescSpec org.apache.texera.amber.operator.dummy.DummyOpDescSpec org.apache.texera.amber.operator.aggregate.AggregateOpDescSpec"` — 13 tests, all green - `sbt "WorkflowOperator/Test/scalafmtCheck"` and `sbt "WorkflowOperator/Test/scalafix --check"` — clean - CI to confirm ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.8 [1M context]) --- .../operator/aggregate/AggregateOpDescSpec.scala | 90 ++++++++++++++++++++++ .../amber/operator/dummy/DummyOpDescSpec.scala | 80 +++++++++++++++++++ .../amber/operator/ifStatement/IfOpDescSpec.scala | 79 +++++++++++++++++++ 3 files changed, 249 insertions(+) diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/aggregate/AggregateOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/aggregate/AggregateOpDescSpec.scala new file mode 100644 index 0000000000..681a1aa2be --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/aggregate/AggregateOpDescSpec.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.aggregate + +import org.apache.texera.amber.core.tuple.{AttributeType, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.PortIdentity +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class AggregateOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + private def aggOp(fn: AggregationFunction, attr: String, result: String): AggregationOperation = { + val a = new AggregationOperation() + a.aggFunction = fn + a.attribute = attr + a.resultAttribute = result + a + } + + // Each test builds a FRESH desc: getPhysicalPlan mutates `aggregations` (getFinal), + // so the descriptor is intentionally not idempotent across calls. + private def descWith(keys: List[String], aggs: AggregationOperation*): AggregateOpDesc = { + val d = new AggregateOpDesc + d.groupByKeys = keys + d.aggregations = aggs.toList + d + } + + "AggregateOpDesc.operatorInfo" should "advertise the name and Aggregate group" in { + val info = (new AggregateOpDesc).operatorInfo + info.userFriendlyName shouldBe "Aggregate" + info.operatorDescription shouldBe "Calculate different types of aggregation values" + info.operatorGroupName shouldBe OperatorGroupConstants.AGGREGATE_GROUP + info.inputPorts should have length 1 + info.outputPorts should have length 1 + info.supportReconfiguration shouldBe false + } + + "AggregateOpDesc.getPhysicalPlan" should + "build a two-stage (localAgg + globalAgg) plan with one connecting link" in { + val plan = descWith(List("city"), aggOp(AggregationFunction.SUM, "sales", "total")) + .getPhysicalPlan(workflowId, executionId) + plan.operators should have size 2 + plan.links should have size 1 + } + + "AggregateOpDesc schema propagation" should + "produce the group-by keys plus the aggregation result column (SUM keeps the input type)" in { + val input = Schema().add("city", AttributeType.STRING).add("sales", AttributeType.INTEGER) + val out = descWith(List("city"), aggOp(AggregationFunction.SUM, "sales", "total")) + .getExternalOutputSchemas(Map(PortIdentity() -> input)) + out shouldBe Map( + PortIdentity() -> Schema() + .add("city", AttributeType.STRING) + .add("total", AttributeType.INTEGER) + ) + } + + it should "type a COUNT result as INTEGER and an AVERAGE result as DOUBLE" in { + val input = Schema().add("v", AttributeType.LONG) + descWith(List.empty, aggOp(AggregationFunction.COUNT, "v", "cnt")) + .getExternalOutputSchemas(Map(PortIdentity() -> input)) shouldBe + Map(PortIdentity() -> Schema().add("cnt", AttributeType.INTEGER)) + descWith(List.empty, aggOp(AggregationFunction.AVERAGE, "v", "avg")) + .getExternalOutputSchemas(Map(PortIdentity() -> input)) shouldBe + Map(PortIdentity() -> Schema().add("avg", AttributeType.DOUBLE)) + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/dummy/DummyOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/dummy/DummyOpDescSpec.scala new file mode 100644 index 0000000000..6f23261b27 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/dummy/DummyOpDescSpec.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.dummy + +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PortIdentity, UnknownPartition} +import org.apache.texera.amber.operator.PortDescription +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class DummyOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + "DummyOpDesc.operatorInfo" should + "advertise the Utility group and enable all dynamic / reconfiguration / customization flags" in { + val info = (new DummyOpDesc).operatorInfo + info.userFriendlyName shouldBe "Dummy" + info.operatorDescription shouldBe "A dummy operator used as a placeholder." + info.operatorGroupName shouldBe OperatorGroupConstants.UTILITY_GROUP + info.dynamicInputPorts shouldBe true + info.dynamicOutputPorts shouldBe true + info.supportReconfiguration shouldBe true + info.allowPortCustomization shouldBe true + } + + "DummyOpDesc" should + "derive a single default input/output port when the port lists are unset (null)" in { + val info = (new DummyOpDesc).operatorInfo + info.inputPorts shouldBe List(InputPort()) + info.outputPorts shouldBe List(OutputPort()) + } + + it should "derive ports from an explicit PortDescriptor list, indexed by position" in { + val d = new DummyOpDesc + d.inputPorts = List( + PortDescription( + "p0", + "first", + disallowMultiInputs = false, + isDynamicPort = false, + UnknownPartition() + ) + ) + val ports = d.operatorInfo.inputPorts + ports should have length 1 + ports.head.id shouldBe PortIdentity(0) + ports.head.displayName shouldBe "first" + } + + "DummyOpDesc.dummyOperator" should "default to the empty string" in { + (new DummyOpDesc).dummyOperator shouldBe "" + } + + "DummyOpDesc.getPhysicalOp" should + "be the unimplemented LogicalOp stub (throws NotImplementedError)" in { + intercept[NotImplementedError] { + (new DummyOpDesc).getPhysicalOp(workflowId, executionId) + } + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/ifStatement/IfOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/ifStatement/IfOpDescSpec.scala new file mode 100644 index 0000000000..49cd325d57 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/ifStatement/IfOpDescSpec.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.ifStatement + +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.PortIdentity +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class IfOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + "IfOpDesc.operatorInfo" should + "advertise two inputs (Condition + data) and two outputs (False/True) in the Control group" in { + val info = (new IfOpDesc).operatorInfo + info.userFriendlyName shouldBe "If" + info.operatorGroupName shouldBe OperatorGroupConstants.CONTROL_GROUP + info.inputPorts should have length 2 + info.inputPorts.head.id shouldBe PortIdentity() + info.inputPorts.head.displayName shouldBe "Condition" + info.inputPorts.last.id shouldBe PortIdentity(1) + info.outputPorts.map(_.id) shouldBe List(PortIdentity(), PortIdentity(1)) + info.outputPorts.head.displayName shouldBe "False" + info.outputPorts.last.displayName shouldBe "True" + } + + "IfOpDesc.conditionName" should "default to null" in { + (new IfOpDesc).conditionName shouldBe null + } + + "IfOpDesc.getPhysicalOp" should + "wire IfOpExec, be non-parallelizable, and carry the port identities" in { + val op = new IfOpDesc + op.conditionName = "ready" + val physical = op.getPhysicalOp(workflowId, executionId) + physical.parallelizable shouldBe false + physical.opExecInitInfo match { + case OpExecWithClassName(className, descString) => + className shouldBe "org.apache.texera.amber.operator.ifStatement.IfOpExec" + descString should not be empty + case other => fail(s"expected OpExecWithClassName, got $other") + } + physical.inputPorts.keySet shouldBe op.operatorInfo.inputPorts.map(_.id).toSet + physical.outputPorts.keySet shouldBe op.operatorInfo.outputPorts.map(_.id).toSet + } + + "IfOpDesc schema propagation" should + "route the data input's schema (inputPorts.last) to BOTH outputs, dropping the condition schema" in { + val physical = (new IfOpDesc).getPhysicalOp(workflowId, executionId) + val condSchema = Schema().add(new Attribute("cond", AttributeType.BOOLEAN)) + val dataSchema = Schema().add(new Attribute("payload", AttributeType.STRING)) + val out = physical.propagateSchema.func( + Map(PortIdentity() -> condSchema, PortIdentity(1) -> dataSchema) + ) + out shouldBe Map(PortIdentity() -> dataSchema, PortIdentity(1) -> dataSchema) + } +}
