This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-5738-8a90f1f667c44bc26c0faf9eee619392e3f57ddf
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 0efbc0f59cad0a660912fab63de04a4860d8b42c
Author: Xinyuan Lin <[email protected]>
AuthorDate: Wed Jun 17 15:07:43 2026 -0700

    test(workflow-operator): add unit test coverage for SET-family LogicalOp 
descriptors (#5738)
    
    ### What changes were proposed in this PR?
    
    Pin behavior of three previously-uncovered `LogicalOp` descriptors in
    the SET / cleaning operator family. Each descriptor wires a physical-op
    class name + port shape + (where applicable) partitioning +
    schema-propagation contract through `getPhysicalOp`. No production-code
    changes.
    
    | Spec | Source class | Tests |
    | --- | --- | --- |
    | `UnionOpDescSpec` | `UnionOpDesc` | 5 |
    | `DistinctOpDescSpec` | `DistinctOpDesc` | 7 |
    | `DifferenceOpDescSpec` | `DifferenceOpDesc` | 9 |
    
    All three spec files follow the `<srcClassName>Spec.scala` one-to-one
    convention. `IntersectOpDescSpec` already exists and gave us the
    spec-shape template.
    
    **Behavior pinned — `UnionOpDesc`**
    
    | Surface | Contract |
    | --- | --- |
    | `operatorInfo` | name `"Union"`, group `SET_GROUP`, description
    mentions "Union" |
    | Ports | one input, one non-blocking output |
    | `getPhysicalOp` | wires
    `OpExecWithClassName("…operator.union.UnionOpExec")` |
    | Partition requirement | empty (no hash-alignment forced; unlike
    Distinct / Difference / Intersect, Union preserves whatever the upstream
    produced) |
    | Independent instances | no static state shared across `new
    UnionOpDesc` |
    
    **Behavior pinned — `DistinctOpDesc`**
    
    | Surface | Contract |
    | --- | --- |
    | `operatorInfo` | name `"Distinct"`, group `CLEANING_GROUP`,
    description mentions "duplicate" |
    | Ports | one input, one **blocking** output |
    | `getPhysicalOp` | wires
    `OpExecWithClassName("…operator.distinct.DistinctOpExec")`;
    `partitionRequirement` is `List(Option(HashPartition()))`;
    `derivePartition` always returns `HashPartition` regardless of input
    partition kind |
    
    **Behavior pinned — `DifferenceOpDesc`**
    
    | Surface | Contract |
    | --- | --- |
    | `operatorInfo` | name `"Difference"`, group `SET_GROUP`, description
    mentions "difference"; two input ports with `displayName` `"left"`
    (PortIdentity 0) and `"right"` (PortIdentity 1); one **blocking** output
    |
    | `getPhysicalOp` | wires
    `OpExecWithClassName("…operator.difference.DifferenceOpExec")`;
    `partitionRequirement` is `List(Option(HashPartition()),
    Option(HashPartition()))` (both inputs); `derivePartition` always
    returns `HashPartition` |
    | Schema propagation | accepts a single shared input schema and produces
    that schema on every output port; throws `IllegalArgumentException` when
    the two inputs do not share one schema |
    
    ### Any related issues, documentation, discussions?
    
    Closes #5734.
    
    ### How was this PR tested?
    
    Pure unit-test additions; verified locally with:
    
    - `sbt "WorkflowOperator/testOnly
    org.apache.texera.amber.operator.union.UnionOpDescSpec
    org.apache.texera.amber.operator.distinct.DistinctOpDescSpec
    org.apache.texera.amber.operator.difference.DifferenceOpDescSpec"` — 21
    tests, all green
    - `sbt scalafmtCheckAll` — clean
    - CI to confirm
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Opus 4.7 [1M context])
---
 .../operator/difference/DifferenceOpDescSpec.scala | 143 +++++++++++++++++++++
 .../operator/distinct/DistinctOpDescSpec.scala     | 109 ++++++++++++++++
 .../amber/operator/union/UnionOpDescSpec.scala     | 104 +++++++++++++++
 3 files changed, 356 insertions(+)

diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/difference/DifferenceOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/difference/DifferenceOpDescSpec.scala
new file mode 100644
index 0000000000..f6d17e94eb
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/difference/DifferenceOpDescSpec.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.difference
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.{
+  HashPartition,
+  PortIdentity,
+  SinglePartition,
+  UnknownPartition
+}
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class DifferenceOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  private val schemaA: Schema =
+    Schema().add(new Attribute("col", AttributeType.STRING))
+  private val schemaB: Schema =
+    Schema().add(new Attribute("col", AttributeType.STRING))
+  private val schemaDifferent: Schema =
+    Schema().add(new Attribute("other", AttributeType.INTEGER))
+
+  // 
---------------------------------------------------------------------------
+  // operatorInfo
+  // 
---------------------------------------------------------------------------
+
+  "DifferenceOpDesc.operatorInfo" should "advertise the Set group + difference 
description" in {
+    val info = (new DifferenceOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "Difference"
+    info.operatorGroupName shouldBe OperatorGroupConstants.SET_GROUP
+    info.operatorDescription.toLowerCase should include("difference")
+  }
+
+  it should
+    "expose two input ports (left at PortIdentity 0, right at PortIdentity 1) 
and one blocking output" in {
+    val info = (new DifferenceOpDesc).operatorInfo
+    info.inputPorts should have length 2
+    info.inputPorts.map(_.id) shouldBe List(PortIdentity(), PortIdentity(1))
+    info.inputPorts.map(_.displayName) shouldBe List("left", "right")
+    info.outputPorts should have length 1
+    info.outputPorts.head.blocking shouldBe true
+  }
+
+  // 
---------------------------------------------------------------------------
+  // getPhysicalOp — wiring + partitioning + schema propagation
+  // 
---------------------------------------------------------------------------
+
+  "DifferenceOpDesc.getPhysicalOp" should
+    "wire the DifferenceOpExec class name into the OpExecInitInfo" in {
+    val physical = (new DifferenceOpDesc).getPhysicalOp(workflowId, 
executionId)
+    physical.opExecInitInfo match {
+      case OpExecWithClassName(className, _) =>
+        className shouldBe 
"org.apache.texera.amber.operator.difference.DifferenceOpExec"
+      case other =>
+        fail(s"expected OpExecWithClassName, got $other")
+    }
+  }
+
+  it should "require HashPartition on BOTH input ports" in {
+    // Set-difference semantics require both inputs to be hash-aligned so
+    // matching keys can be compared on the same worker.
+    val physical = (new DifferenceOpDesc).getPhysicalOp(workflowId, 
executionId)
+    physical.partitionRequirement shouldBe List(
+      Option(HashPartition()),
+      Option(HashPartition())
+    )
+  }
+
+  it should "derive HashPartition for the output regardless of input partition 
kinds" in {
+    val physical = (new DifferenceOpDesc).getPhysicalOp(workflowId, 
executionId)
+    physical.derivePartition(List(SinglePartition(), UnknownPartition())) 
shouldBe HashPartition()
+    physical.derivePartition(
+      List(HashPartition(List("a")), HashPartition(List("b")))
+    ) shouldBe HashPartition()
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Schema propagation
+  // 
---------------------------------------------------------------------------
+
+  "DifferenceOpDesc schema propagation" should
+    "produce a single output schema equal to the (shared) input schema" in {
+    // When both inputs report the same schema, propagation succeeds and
+    // every output port receives that schema.
+    val op = new DifferenceOpDesc
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    val propagateFn = physical.propagateSchema
+    val inputs = Map(PortIdentity() -> schemaA, PortIdentity(1) -> schemaB)
+    val outputs = propagateFn.func(inputs)
+    outputs.keySet shouldBe op.operatorInfo.outputPorts.map(_.id).toSet
+    outputs.values.toSet shouldBe Set(schemaA)
+  }
+
+  it should
+    "throw IllegalArgumentException when the two inputs do not share one 
schema" in {
+    val physical = (new DifferenceOpDesc).getPhysicalOp(workflowId, 
executionId)
+    val propagateFn = physical.propagateSchema
+    val mismatched =
+      Map(PortIdentity() -> schemaA, PortIdentity(1) -> schemaDifferent)
+    intercept[IllegalArgumentException] {
+      propagateFn.func(mismatched)
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Independent instances
+  // 
---------------------------------------------------------------------------
+
+  "DifferenceOpDesc" should
+    "assign a fresh operatorIdentifier per instance (UUID-based id is not 
shared)" in {
+    // `LogicalOp` initializes `operatorId` from `UUID.randomUUID()` in
+    // its constructor body, so two `new DifferenceOpDesc` allocations
+    // must hold different identifiers. A regression to a static /
+    // shared id would surface here as the two ids being equal.
+    val a = new DifferenceOpDesc
+    val b = new DifferenceOpDesc
+    a.operatorIdentifier should not equal b.operatorIdentifier
+  }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/distinct/DistinctOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/distinct/DistinctOpDescSpec.scala
new file mode 100644
index 0000000000..2aba788acf
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/distinct/DistinctOpDescSpec.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.distinct
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.{HashPartition, SinglePartition, 
UnknownPartition}
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class DistinctOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  // 
---------------------------------------------------------------------------
+  // operatorInfo — descriptor metadata
+  // 
---------------------------------------------------------------------------
+
+  "DistinctOpDesc.operatorInfo" should
+    "advertise the user-friendly name and Cleaning group" in {
+    val info = (new DistinctOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "Distinct"
+    info.operatorGroupName shouldBe OperatorGroupConstants.CLEANING_GROUP
+    info.operatorDescription.toLowerCase should include("duplicate")
+  }
+
+  it should "expose one input port and one blocking output port" in {
+    val info = (new DistinctOpDesc).operatorInfo
+    info.inputPorts should have length 1
+    info.outputPorts should have length 1
+    info.outputPorts.head.blocking shouldBe true
+  }
+
+  // 
---------------------------------------------------------------------------
+  // getPhysicalOp — wiring to DistinctOpExec + partitioning contract
+  // 
---------------------------------------------------------------------------
+
+  "DistinctOpDesc.getPhysicalOp" should
+    "wire the DistinctOpExec class name into the OpExecInitInfo" in {
+    val op = new DistinctOpDesc
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    physical.opExecInitInfo match {
+      case OpExecWithClassName(className, _) =>
+        className shouldBe 
"org.apache.texera.amber.operator.distinct.DistinctOpExec"
+      case other =>
+        fail(s"expected OpExecWithClassName, got $other")
+    }
+  }
+
+  it should "require HashPartition on the single input port" in {
+    val physical = (new DistinctOpDesc).getPhysicalOp(workflowId, executionId)
+    physical.partitionRequirement shouldBe List(Option(HashPartition()))
+  }
+
+  it should "always derive HashPartition for the output regardless of input 
partitions" in {
+    // Distinct's dedup semantics depend on hash-alignment, so the
+    // derived output partition stays hash even when upstream inputs
+    // report differing partition kinds.
+    val physical = (new DistinctOpDesc).getPhysicalOp(workflowId, executionId)
+    physical.derivePartition(List(SinglePartition())) shouldBe HashPartition()
+    physical.derivePartition(List(UnknownPartition())) shouldBe HashPartition()
+    physical.derivePartition(List(HashPartition(List("col-a")))) shouldBe 
HashPartition()
+  }
+
+  it should "preserve the one input / one blocking output port shape from 
operatorInfo" in {
+    val op = new DistinctOpDesc
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    physical.inputPorts should have size 1
+    physical.outputPorts should have size 1
+    // PhysicalOp.outputPorts is a Map[PortIdentity, (OutputPort, …, …)],
+    // so the blocking flag is on the first tuple element of the value.
+    val (outputPort, _, _) = physical.outputPorts.values.head
+    outputPort.blocking shouldBe true
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Independent instances
+  // 
---------------------------------------------------------------------------
+
+  "DistinctOpDesc" should
+    "assign a fresh operatorIdentifier per instance (UUID-based id is not 
shared)" in {
+    // `LogicalOp` initializes `operatorId` from `UUID.randomUUID()` in
+    // its constructor body, so two `new DistinctOpDesc` allocations
+    // must hold different identifiers. A regression to a static /
+    // shared id would surface here as the two ids being equal.
+    val a = new DistinctOpDesc
+    val b = new DistinctOpDesc
+    a.operatorIdentifier should not equal b.operatorIdentifier
+  }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/union/UnionOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/union/UnionOpDescSpec.scala
new file mode 100644
index 0000000000..a9c58bbcda
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/union/UnionOpDescSpec.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.union
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class UnionOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  // 
---------------------------------------------------------------------------
+  // operatorInfo — descriptor metadata
+  // 
---------------------------------------------------------------------------
+
+  "UnionOpDesc.operatorInfo" should "advertise the user-friendly name and Set 
group" in {
+    val info = (new UnionOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "Union"
+    info.operatorGroupName shouldBe OperatorGroupConstants.SET_GROUP
+    info.operatorDescription should include("Union")
+  }
+
+  it should "expose exactly one input port and one (non-blocking) output port" 
in {
+    val info = (new UnionOpDesc).operatorInfo
+    info.inputPorts should have length 1
+    info.outputPorts should have length 1
+    info.outputPorts.head.blocking shouldBe false
+  }
+
+  // 
---------------------------------------------------------------------------
+  // getPhysicalOp — wiring to UnionOpExec
+  // 
---------------------------------------------------------------------------
+
+  "UnionOpDesc.getPhysicalOp" should
+    "wire the UnionOpExec class name into the OpExecInitInfo" in {
+    val op = new UnionOpDesc
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    physical.opExecInitInfo match {
+      case OpExecWithClassName(className, _) =>
+        className shouldBe "org.apache.texera.amber.operator.union.UnionOpExec"
+      case other =>
+        fail(s"expected OpExecWithClassName, got $other")
+    }
+  }
+
+  it should "expose the same input/output port shape as operatorInfo" in {
+    val op = new UnionOpDesc
+    val info = op.operatorInfo
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    // `physical.inputPorts` / `outputPorts` are `Map`s — compare `size`
+    // (Int) directly; the descriptor's `operatorInfo.*.size` is also an
+    // Int, so no Long coercion is needed.
+    assert(physical.inputPorts.size == info.inputPorts.size)
+    assert(physical.outputPorts.size == info.outputPorts.size)
+  }
+
+  it should "leave the partition requirement empty (no hash-alignment forced)" 
in {
+    // Unlike Distinct / Difference / Intersect in the same SET group,
+    // Union does NOT require its inputs to be hash-partitioned — the
+    // pass-through executor preserves whatever the upstream produced.
+    //
+    // Assert on the list itself (not just `.flatten`) so a regression
+    // that introduced a `None` entry (`List(None)` — same "no
+    // requirement" semantics but a different list shape) is caught here.
+    val physical = (new UnionOpDesc).getPhysicalOp(workflowId, executionId)
+    physical.partitionRequirement shouldBe empty
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Independent instances
+  // 
---------------------------------------------------------------------------
+
+  "UnionOpDesc" should
+    "assign a fresh operatorIdentifier per instance (UUID-based id is not 
shared)" in {
+    // `LogicalOp` initializes `operatorId` from `UUID.randomUUID()` in
+    // its constructor body, so two `new UnionOpDesc` allocations must
+    // hold different identifiers. A regression to a static / shared id
+    // would surface here as the two ids being equal.
+    val a = new UnionOpDesc
+    val b = new UnionOpDesc
+    a.operatorIdentifier should not equal b.operatorIdentifier
+  }
+}

Reply via email to