This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 0efbc0f59c test(workflow-operator): add unit test coverage for
SET-family LogicalOp descriptors (#5738)
0efbc0f59c is described below
commit 0efbc0f59cad0a660912fab63de04a4860d8b42c
Author: Xinyuan Lin <[email protected]>
AuthorDate: Wed Jun 17 15:07:43 2026 -0700
test(workflow-operator): add unit test coverage for SET-family LogicalOp
descriptors (#5738)
### What changes were proposed in this PR?
Pin behavior of three previously-uncovered `LogicalOp` descriptors in
the SET / cleaning operator family. Each descriptor wires a physical-op
class name + port shape + (where applicable) partitioning +
schema-propagation contract through `getPhysicalOp`. No production-code
changes.
| Spec | Source class | Tests |
| --- | --- | --- |
| `UnionOpDescSpec` | `UnionOpDesc` | 5 |
| `DistinctOpDescSpec` | `DistinctOpDesc` | 7 |
| `DifferenceOpDescSpec` | `DifferenceOpDesc` | 9 |
All three spec files follow the `<srcClassName>Spec.scala` one-to-one
convention. `IntersectOpDescSpec` already exists and gave us the
spec-shape template.
**Behavior pinned — `UnionOpDesc`**
| Surface | Contract |
| --- | --- |
| `operatorInfo` | name `"Union"`, group `SET_GROUP`, description
mentions "Union" |
| Ports | one input, one non-blocking output |
| `getPhysicalOp` | wires
`OpExecWithClassName("…operator.union.UnionOpExec")` |
| Partition requirement | empty (no hash-alignment forced; unlike
Distinct / Difference / Intersect, Union preserves whatever the upstream
produced) |
| Independent instances | no static state shared across `new
UnionOpDesc` |
**Behavior pinned — `DistinctOpDesc`**
| Surface | Contract |
| --- | --- |
| `operatorInfo` | name `"Distinct"`, group `CLEANING_GROUP`,
description mentions "duplicate" |
| Ports | one input, one **blocking** output |
| `getPhysicalOp` | wires
`OpExecWithClassName("…operator.distinct.DistinctOpExec")`;
`partitionRequirement` is `List(Option(HashPartition()))`;
`derivePartition` always returns `HashPartition` regardless of input
partition kind |
**Behavior pinned — `DifferenceOpDesc`**
| Surface | Contract |
| --- | --- |
| `operatorInfo` | name `"Difference"`, group `SET_GROUP`, description
mentions "difference"; two input ports with `displayName` `"left"`
(PortIdentity 0) and `"right"` (PortIdentity 1); one **blocking** output
|
| `getPhysicalOp` | wires
`OpExecWithClassName("…operator.difference.DifferenceOpExec")`;
`partitionRequirement` is `List(Option(HashPartition()),
Option(HashPartition()))` (both inputs); `derivePartition` always
returns `HashPartition` |
| Schema propagation | accepts a single shared input schema and produces
that schema on every output port; throws `IllegalArgumentException` when
the two inputs do not share one schema |
### Any related issues, documentation, discussions?
Closes #5734.
### How was this PR tested?
Pure unit-test additions; verified locally with:
- `sbt "WorkflowOperator/testOnly
org.apache.texera.amber.operator.union.UnionOpDescSpec
org.apache.texera.amber.operator.distinct.DistinctOpDescSpec
org.apache.texera.amber.operator.difference.DifferenceOpDescSpec"` — 21
tests, all green
- `sbt scalafmtCheckAll` — clean
- CI to confirm
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.7 [1M context])
---
.../operator/difference/DifferenceOpDescSpec.scala | 143 +++++++++++++++++++++
.../operator/distinct/DistinctOpDescSpec.scala | 109 ++++++++++++++++
.../amber/operator/union/UnionOpDescSpec.scala | 104 +++++++++++++++
3 files changed, 356 insertions(+)
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/difference/DifferenceOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/difference/DifferenceOpDescSpec.scala
new file mode 100644
index 0000000000..f6d17e94eb
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/difference/DifferenceOpDescSpec.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.difference
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.{
+ HashPartition,
+ PortIdentity,
+ SinglePartition,
+ UnknownPartition
+}
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class DifferenceOpDescSpec extends AnyFlatSpec with Matchers {
+
+ private val workflowId = WorkflowIdentity(1L)
+ private val executionId = ExecutionIdentity(1L)
+
+ private val schemaA: Schema =
+ Schema().add(new Attribute("col", AttributeType.STRING))
+ private val schemaB: Schema =
+ Schema().add(new Attribute("col", AttributeType.STRING))
+ private val schemaDifferent: Schema =
+ Schema().add(new Attribute("other", AttributeType.INTEGER))
+
+ //
---------------------------------------------------------------------------
+ // operatorInfo
+ //
---------------------------------------------------------------------------
+
+ "DifferenceOpDesc.operatorInfo" should "advertise the Set group + difference
description" in {
+ val info = (new DifferenceOpDesc).operatorInfo
+ info.userFriendlyName shouldBe "Difference"
+ info.operatorGroupName shouldBe OperatorGroupConstants.SET_GROUP
+ info.operatorDescription.toLowerCase should include("difference")
+ }
+
+ it should
+ "expose two input ports (left at PortIdentity 0, right at PortIdentity 1)
and one blocking output" in {
+ val info = (new DifferenceOpDesc).operatorInfo
+ info.inputPorts should have length 2
+ info.inputPorts.map(_.id) shouldBe List(PortIdentity(), PortIdentity(1))
+ info.inputPorts.map(_.displayName) shouldBe List("left", "right")
+ info.outputPorts should have length 1
+ info.outputPorts.head.blocking shouldBe true
+ }
+
+ //
---------------------------------------------------------------------------
+ // getPhysicalOp — wiring + partitioning + schema propagation
+ //
---------------------------------------------------------------------------
+
+ "DifferenceOpDesc.getPhysicalOp" should
+ "wire the DifferenceOpExec class name into the OpExecInitInfo" in {
+ val physical = (new DifferenceOpDesc).getPhysicalOp(workflowId,
executionId)
+ physical.opExecInitInfo match {
+ case OpExecWithClassName(className, _) =>
+ className shouldBe
"org.apache.texera.amber.operator.difference.DifferenceOpExec"
+ case other =>
+ fail(s"expected OpExecWithClassName, got $other")
+ }
+ }
+
+ it should "require HashPartition on BOTH input ports" in {
+ // Set-difference semantics require both inputs to be hash-aligned so
+ // matching keys can be compared on the same worker.
+ val physical = (new DifferenceOpDesc).getPhysicalOp(workflowId,
executionId)
+ physical.partitionRequirement shouldBe List(
+ Option(HashPartition()),
+ Option(HashPartition())
+ )
+ }
+
+ it should "derive HashPartition for the output regardless of input partition
kinds" in {
+ val physical = (new DifferenceOpDesc).getPhysicalOp(workflowId,
executionId)
+ physical.derivePartition(List(SinglePartition(), UnknownPartition()))
shouldBe HashPartition()
+ physical.derivePartition(
+ List(HashPartition(List("a")), HashPartition(List("b")))
+ ) shouldBe HashPartition()
+ }
+
+ //
---------------------------------------------------------------------------
+ // Schema propagation
+ //
---------------------------------------------------------------------------
+
+ "DifferenceOpDesc schema propagation" should
+ "produce a single output schema equal to the (shared) input schema" in {
+ // When both inputs report the same schema, propagation succeeds and
+ // every output port receives that schema.
+ val op = new DifferenceOpDesc
+ val physical = op.getPhysicalOp(workflowId, executionId)
+ val propagateFn = physical.propagateSchema
+ val inputs = Map(PortIdentity() -> schemaA, PortIdentity(1) -> schemaB)
+ val outputs = propagateFn.func(inputs)
+ outputs.keySet shouldBe op.operatorInfo.outputPorts.map(_.id).toSet
+ outputs.values.toSet shouldBe Set(schemaA)
+ }
+
+ it should
+ "throw IllegalArgumentException when the two inputs do not share one
schema" in {
+ val physical = (new DifferenceOpDesc).getPhysicalOp(workflowId,
executionId)
+ val propagateFn = physical.propagateSchema
+ val mismatched =
+ Map(PortIdentity() -> schemaA, PortIdentity(1) -> schemaDifferent)
+ intercept[IllegalArgumentException] {
+ propagateFn.func(mismatched)
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Independent instances
+ //
---------------------------------------------------------------------------
+
+ "DifferenceOpDesc" should
+ "assign a fresh operatorIdentifier per instance (UUID-based id is not
shared)" in {
+ // `LogicalOp` initializes `operatorId` from `UUID.randomUUID()` in
+ // its constructor body, so two `new DifferenceOpDesc` allocations
+ // must hold different identifiers. A regression to a static /
+ // shared id would surface here as the two ids being equal.
+ val a = new DifferenceOpDesc
+ val b = new DifferenceOpDesc
+ a.operatorIdentifier should not equal b.operatorIdentifier
+ }
+}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/distinct/DistinctOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/distinct/DistinctOpDescSpec.scala
new file mode 100644
index 0000000000..2aba788acf
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/distinct/DistinctOpDescSpec.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.distinct
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.{HashPartition, SinglePartition,
UnknownPartition}
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class DistinctOpDescSpec extends AnyFlatSpec with Matchers {
+
+ private val workflowId = WorkflowIdentity(1L)
+ private val executionId = ExecutionIdentity(1L)
+
+ //
---------------------------------------------------------------------------
+ // operatorInfo — descriptor metadata
+ //
---------------------------------------------------------------------------
+
+ "DistinctOpDesc.operatorInfo" should
+ "advertise the user-friendly name and Cleaning group" in {
+ val info = (new DistinctOpDesc).operatorInfo
+ info.userFriendlyName shouldBe "Distinct"
+ info.operatorGroupName shouldBe OperatorGroupConstants.CLEANING_GROUP
+ info.operatorDescription.toLowerCase should include("duplicate")
+ }
+
+ it should "expose one input port and one blocking output port" in {
+ val info = (new DistinctOpDesc).operatorInfo
+ info.inputPorts should have length 1
+ info.outputPorts should have length 1
+ info.outputPorts.head.blocking shouldBe true
+ }
+
+ //
---------------------------------------------------------------------------
+ // getPhysicalOp — wiring to DistinctOpExec + partitioning contract
+ //
---------------------------------------------------------------------------
+
+ "DistinctOpDesc.getPhysicalOp" should
+ "wire the DistinctOpExec class name into the OpExecInitInfo" in {
+ val op = new DistinctOpDesc
+ val physical = op.getPhysicalOp(workflowId, executionId)
+ physical.opExecInitInfo match {
+ case OpExecWithClassName(className, _) =>
+ className shouldBe
"org.apache.texera.amber.operator.distinct.DistinctOpExec"
+ case other =>
+ fail(s"expected OpExecWithClassName, got $other")
+ }
+ }
+
+ it should "require HashPartition on the single input port" in {
+ val physical = (new DistinctOpDesc).getPhysicalOp(workflowId, executionId)
+ physical.partitionRequirement shouldBe List(Option(HashPartition()))
+ }
+
+ it should "always derive HashPartition for the output regardless of input
partitions" in {
+ // Distinct's dedup semantics depend on hash-alignment, so the
+ // derived output partition stays hash even when upstream inputs
+ // report differing partition kinds.
+ val physical = (new DistinctOpDesc).getPhysicalOp(workflowId, executionId)
+ physical.derivePartition(List(SinglePartition())) shouldBe HashPartition()
+ physical.derivePartition(List(UnknownPartition())) shouldBe HashPartition()
+ physical.derivePartition(List(HashPartition(List("col-a")))) shouldBe
HashPartition()
+ }
+
+ it should "preserve the one input / one blocking output port shape from
operatorInfo" in {
+ val op = new DistinctOpDesc
+ val physical = op.getPhysicalOp(workflowId, executionId)
+ physical.inputPorts should have size 1
+ physical.outputPorts should have size 1
+ // PhysicalOp.outputPorts is a Map[PortIdentity, (OutputPort, …, …)],
+ // so the blocking flag is on the first tuple element of the value.
+ val (outputPort, _, _) = physical.outputPorts.values.head
+ outputPort.blocking shouldBe true
+ }
+
+ //
---------------------------------------------------------------------------
+ // Independent instances
+ //
---------------------------------------------------------------------------
+
+ "DistinctOpDesc" should
+ "assign a fresh operatorIdentifier per instance (UUID-based id is not
shared)" in {
+ // `LogicalOp` initializes `operatorId` from `UUID.randomUUID()` in
+ // its constructor body, so two `new DistinctOpDesc` allocations
+ // must hold different identifiers. A regression to a static /
+ // shared id would surface here as the two ids being equal.
+ val a = new DistinctOpDesc
+ val b = new DistinctOpDesc
+ a.operatorIdentifier should not equal b.operatorIdentifier
+ }
+}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/union/UnionOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/union/UnionOpDescSpec.scala
new file mode 100644
index 0000000000..a9c58bbcda
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/union/UnionOpDescSpec.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.union
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class UnionOpDescSpec extends AnyFlatSpec with Matchers {
+
+ private val workflowId = WorkflowIdentity(1L)
+ private val executionId = ExecutionIdentity(1L)
+
+ //
---------------------------------------------------------------------------
+ // operatorInfo — descriptor metadata
+ //
---------------------------------------------------------------------------
+
+ "UnionOpDesc.operatorInfo" should "advertise the user-friendly name and Set
group" in {
+ val info = (new UnionOpDesc).operatorInfo
+ info.userFriendlyName shouldBe "Union"
+ info.operatorGroupName shouldBe OperatorGroupConstants.SET_GROUP
+ info.operatorDescription should include("Union")
+ }
+
+ it should "expose exactly one input port and one (non-blocking) output port"
in {
+ val info = (new UnionOpDesc).operatorInfo
+ info.inputPorts should have length 1
+ info.outputPorts should have length 1
+ info.outputPorts.head.blocking shouldBe false
+ }
+
+ //
---------------------------------------------------------------------------
+ // getPhysicalOp — wiring to UnionOpExec
+ //
---------------------------------------------------------------------------
+
+ "UnionOpDesc.getPhysicalOp" should
+ "wire the UnionOpExec class name into the OpExecInitInfo" in {
+ val op = new UnionOpDesc
+ val physical = op.getPhysicalOp(workflowId, executionId)
+ physical.opExecInitInfo match {
+ case OpExecWithClassName(className, _) =>
+ className shouldBe "org.apache.texera.amber.operator.union.UnionOpExec"
+ case other =>
+ fail(s"expected OpExecWithClassName, got $other")
+ }
+ }
+
+ it should "expose the same input/output port shape as operatorInfo" in {
+ val op = new UnionOpDesc
+ val info = op.operatorInfo
+ val physical = op.getPhysicalOp(workflowId, executionId)
+ // `physical.inputPorts` / `outputPorts` are `Map`s — compare `size`
+ // (Int) directly; the descriptor's `operatorInfo.*.size` is also an
+ // Int, so no Long coercion is needed.
+ assert(physical.inputPorts.size == info.inputPorts.size)
+ assert(physical.outputPorts.size == info.outputPorts.size)
+ }
+
+ it should "leave the partition requirement empty (no hash-alignment forced)"
in {
+ // Unlike Distinct / Difference / Intersect in the same SET group,
+ // Union does NOT require its inputs to be hash-partitioned — the
+ // pass-through executor preserves whatever the upstream produced.
+ //
+ // Assert on the list itself (not just `.flatten`) so a regression
+ // that introduced a `None` entry (`List(None)` — same "no
+ // requirement" semantics but a different list shape) is caught here.
+ val physical = (new UnionOpDesc).getPhysicalOp(workflowId, executionId)
+ physical.partitionRequirement shouldBe empty
+ }
+
+ //
---------------------------------------------------------------------------
+ // Independent instances
+ //
---------------------------------------------------------------------------
+
+ "UnionOpDesc" should
+ "assign a fresh operatorIdentifier per instance (UUID-based id is not
shared)" in {
+ // `LogicalOp` initializes `operatorId` from `UUID.randomUUID()` in
+ // its constructor body, so two `new UnionOpDesc` allocations must
+ // hold different identifiers. A regression to a static / shared id
+ // would surface here as the two ids being equal.
+ val a = new UnionOpDesc
+ val b = new UnionOpDesc
+ a.operatorIdentifier should not equal b.operatorIdentifier
+ }
+}