This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5825-357fed00765169b280136803861fb1f59ec6589d in repository https://gitbox.apache.org/repos/asf/texera.git
commit f2d573705981d98ccb946f88182fa59cb82c8614 Author: Xinyuan Lin <[email protected]> AuthorDate: Sat Jun 20 17:37:50 2026 -0700 test(workflow-operator): add unit test coverage for sort and set operator descriptors (#5825) ### What changes were proposed in this PR? Pin behavior of three previously-untested descriptors that reorder rows or combine relations, in `common/workflow-operator/`. No production-code changes. | Spec | Source class | Tests | | --- | --- | --- | | `StableMergeSortOpDescSpec` | `StableMergeSortOpDesc` | 3 | | `SortPartitionsOpDescSpec` | `SortPartitionsOpDesc` | 3 | | `SymmetricDifferenceOpDescSpec` | `SymmetricDifferenceOpDesc` | 4 | All three spec files follow the `<srcClassName>Spec.scala` one-to-one convention. **Behavior pinned** | Surface | Contract | | --- | --- | | `operatorInfo` | exact name + group (`SORT_GROUP` / `SET_GROUP`); blocking output port (sort/set must observe all rows); `SymmetricDifference` advertises two inputs `PortIdentity(0)`/`PortIdentity(1)` | | `getPhysicalOp` wiring | `opExecInitInfo` pattern-matches `OpExecWithClassName` with the exact executor FQCN; port **identities** carried forward (`keySet`, not counts) | | `StableMergeSort` | non-parallelizable many-to-one op; deserializes its `List[SortCriteriaUnit]` sort keys | | `SortPartitions` | `RangePartition(List(attr), domainMin, domainMax)` partition requirement; field round-trip | | `SymmetricDifference` | `HashPartition` requirement on both inputs; schema propagation passes the shared input schema through and throws `IllegalArgumentException` when the two inputs' schemas differ | ### Any related issues, documentation, discussions? Closes #5822. ### How was this PR tested? Pure unit-test additions; verified locally with: - `sbt "WorkflowOperator/testOnly org.apache.texera.amber.operator.sort.StableMergeSortOpDescSpec org.apache.texera.amber.operator.sortPartitions.SortPartitionsOpDescSpec org.apache.texera.amber.operator.symmetricDifference.SymmetricDifferenceOpDescSpec"` — 10 tests, all green - `sbt "WorkflowOperator/Test/scalafmtCheck"` and `sbt "WorkflowOperator/Test/scalafix --check"` — clean - CI to confirm ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.8 [1M context]) --- .../operator/sort/StableMergeSortOpDescSpec.scala | 70 +++++++++++++++++ .../sortPartitions/SortPartitionsOpDescSpec.scala | 80 +++++++++++++++++++ .../SymmetricDifferenceOpDescSpec.scala | 89 ++++++++++++++++++++++ 3 files changed, 239 insertions(+) diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sort/StableMergeSortOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sort/StableMergeSortOpDescSpec.scala new file mode 100644 index 0000000000..933549a3b3 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sort/StableMergeSortOpDescSpec.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.sort + +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class StableMergeSortOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + "StableMergeSortOpDesc.operatorInfo" should + "advertise the name, Sort group, and a single blocking output" in { + val info = (new StableMergeSortOpDesc).operatorInfo + info.userFriendlyName shouldBe "Stable Merge Sort" + info.operatorGroupName shouldBe OperatorGroupConstants.SORT_GROUP + info.inputPorts should have length 1 + info.outputPorts should have length 1 + // A stable sort must observe all rows before emitting, so the output blocks. + info.outputPorts.head.blocking shouldBe true + } + + "StableMergeSortOpDesc.getPhysicalOp" should + "be a non-parallelizable many-to-one op wiring StableMergeSortOpExec" in { + val op = new StableMergeSortOpDesc + val physical = op.getPhysicalOp(workflowId, executionId) + physical.parallelizable shouldBe false + physical.opExecInitInfo match { + case OpExecWithClassName(className, descString) => + className shouldBe "org.apache.texera.amber.operator.sort.StableMergeSortOpExec" + descString should not be empty + case other => fail(s"expected OpExecWithClassName, got $other") + } + physical.inputPorts.keySet shouldBe op.operatorInfo.inputPorts.map(_.id).toSet + physical.outputPorts.keySet shouldBe op.operatorInfo.outputPorts.map(_.id).toSet + } + + "StableMergeSortOpDesc" should + "deserialize its sort keys (List of SortCriteriaUnit) through the polymorphic base" in { + val json = + """{"operatorType":"StableMergeSort","keys":[{"attribute":"age","sortPreference":"DESC"}]}""" + val desc = objectMapper.readValue(json, classOf[LogicalOp]).asInstanceOf[StableMergeSortOpDesc] + desc.keys should have size 1 + desc.keys.head.attributeName shouldBe "age" + desc.keys.head.sortPreference shouldBe SortPreference.DESC + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sortPartitions/SortPartitionsOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sortPartitions/SortPartitionsOpDescSpec.scala new file mode 100644 index 0000000000..d9ae20b14a --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sortPartitions/SortPartitionsOpDescSpec.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.sortPartitions + +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.RangePartition +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class SortPartitionsOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + private def newDesc(attr: String, min: Long, max: Long): SortPartitionsOpDesc = { + val d = new SortPartitionsOpDesc + d.sortAttributeName = attr + d.domainMin = min + d.domainMax = max + d + } + + "SortPartitionsOpDesc.operatorInfo" should + "advertise the name, Sort group, and a single blocking output" in { + val info = (new SortPartitionsOpDesc).operatorInfo + info.userFriendlyName shouldBe "Sort Partitions" + info.operatorGroupName shouldBe OperatorGroupConstants.SORT_GROUP + info.inputPorts should have length 1 + info.outputPorts should have length 1 + info.outputPorts.head.blocking shouldBe true + } + + "SortPartitionsOpDesc.getPhysicalOp" should + "wire SortPartitionsOpExec and require a RangePartition over the sort attribute/domain" in { + val op = newDesc("score", 0L, 100L) + val physical = op.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithClassName(className, descString) => + className shouldBe "org.apache.texera.amber.operator.sortPartitions.SortPartitionsOpExec" + descString should not be empty + case other => fail(s"expected OpExecWithClassName, got $other") + } + physical.inputPorts.keySet shouldBe op.operatorInfo.inputPorts.map(_.id).toSet + physical.outputPorts.keySet shouldBe op.operatorInfo.outputPorts.map(_.id).toSet + physical.partitionRequirement shouldBe List( + Option(RangePartition(List("score"), 0L, 100L)) + ) + } + + "SortPartitionsOpDesc" should "round-trip its attribute/domain fields through the polymorphic base" in { + val json = objectMapper.writeValueAsString(newDesc("age", 1L, 99L)) + val restored = objectMapper.readValue(json, classOf[LogicalOp]) + restored shouldBe a[SortPartitionsOpDesc] + val sp = restored.asInstanceOf[SortPartitionsOpDesc] + sp.sortAttributeName shouldBe "age" + sp.domainMin shouldBe 1L + sp.domainMax shouldBe 99L + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/symmetricDifference/SymmetricDifferenceOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/symmetricDifference/SymmetricDifferenceOpDescSpec.scala new file mode 100644 index 0000000000..e6e2995877 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/symmetricDifference/SymmetricDifferenceOpDescSpec.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.symmetricDifference + +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{HashPartition, PortIdentity} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class SymmetricDifferenceOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + private val schemaA = Schema().add(new Attribute("a", AttributeType.STRING)) + private val schemaDifferent = Schema().add(new Attribute("b", AttributeType.INTEGER)) + + "SymmetricDifferenceOpDesc.operatorInfo" should + "advertise the name, Set group, two inputs, and a single blocking output" in { + val info = (new SymmetricDifferenceOpDesc).operatorInfo + info.userFriendlyName shouldBe "SymmetricDifference" + info.operatorGroupName shouldBe OperatorGroupConstants.SET_GROUP + info.inputPorts.map(_.id) shouldBe List(PortIdentity(0), PortIdentity(1)) + info.outputPorts should have length 1 + info.outputPorts.head.blocking shouldBe true + } + + "SymmetricDifferenceOpDesc.getPhysicalOp" should + "wire SymmetricDifferenceOpExec, carry port identities, and require HashPartition on both inputs" in { + val op = new SymmetricDifferenceOpDesc + val physical = op.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithClassName(className, _) => + className shouldBe "org.apache.texera.amber.operator.symmetricDifference.SymmetricDifferenceOpExec" + case other => fail(s"expected OpExecWithClassName, got $other") + } + physical.inputPorts.keySet shouldBe op.operatorInfo.inputPorts.map(_.id).toSet + physical.outputPorts.keySet shouldBe op.operatorInfo.outputPorts.map(_.id).toSet + physical.partitionRequirement shouldBe List(Option(HashPartition()), Option(HashPartition())) + } + + "SymmetricDifferenceOpDesc schema propagation" should + "pass the shared input schema through to the output port" in { + val op = new SymmetricDifferenceOpDesc + val physical = op.getPhysicalOp(workflowId, executionId) + val inputs = Map(PortIdentity(0) -> schemaA, PortIdentity(1) -> schemaA) + val outputs = physical.propagateSchema.func(inputs) + outputs.keySet shouldBe op.operatorInfo.outputPorts.map(_.id).toSet + outputs.values.toSet shouldBe Set(schemaA) + } + + it should "reject inputs whose schemas differ" in { + val physical = (new SymmetricDifferenceOpDesc).getPhysicalOp(workflowId, executionId) + val mismatched = Map(PortIdentity(0) -> schemaA, PortIdentity(1) -> schemaDifferent) + intercept[IllegalArgumentException] { + physical.propagateSchema.func(mismatched) + } + } + + "SymmetricDifferenceOpDesc" should + "round-trip through the polymorphic base (pins the SymmetricDifference discriminator)" in { + // The operator has no config fields, so this pins the @JsonSubTypes + // discriminator + type resolution, on par with the other specs in this PR. + val json = objectMapper.writeValueAsString(new SymmetricDifferenceOpDesc) + objectMapper.readValue(json, classOf[LogicalOp]) shouldBe a[SymmetricDifferenceOpDesc] + } +}
