This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git

commit e3df0ceedcc6514bf2387bcdb8b29cdb51fa28e0
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sat Jun 20 20:58:41 2026 -0700

    test(workflow-operator): add unit test coverage for control and aggregate 
operator descriptors (If, Dummy, Aggregate) (#5833)
    
    ### What changes were proposed in this PR?
    
    Pin behavior of three core non-source operator descriptors in
    `common/workflow-operator/`. No production-code changes.
    
    | Spec | Source class | Tests |
    | --- | --- | --- |
    | `IfOpDescSpec` | `IfOpDesc` | 4 |
    | `DummyOpDescSpec` | `DummyOpDesc` | 5 |
    | `AggregateOpDescSpec` | `AggregateOpDesc` | 4 |
    
    **Behavior pinned — `IfOpDesc`**
    
    | Surface | Contract |
    | --- | --- |
    | `operatorInfo` | `If`, CONTROL_GROUP; 2 inputs (`Condition` at
    `PortIdentity()`, data at `PortIdentity(1)`); 2 outputs `False`/`True` |
    | `conditionName` | defaults to `null` |
    | `getPhysicalOp` | wires `IfOpExec`, non-parallelizable, port
    identities carried |
    | Schema propagation | routes the **data** input's schema
    (`inputPorts.last`) to **both** outputs; the condition-port schema is
    dropped |
    
    **Behavior pinned — `DummyOpDesc`**
    
    | Surface | Contract |
    | --- | --- |
    | `operatorInfo` | `Dummy`, UTILITY_GROUP; all four
    
`dynamicInputPorts`/`dynamicOutputPorts`/`supportReconfiguration`/`allowPortCustomization`
    flags `true` |
    | Port derivation | null port lists → single default
    `InputPort()`/`OutputPort()`; explicit `PortDescriptor` list → one port
    per element, indexed by position |
    | `dummyOperator` | defaults to `""` |
    | `getPhysicalOp` | the unimplemented `LogicalOp` stub →
    `NotImplementedError` |
    
    **Behavior pinned — `AggregateOpDesc`**
    
    | Surface | Contract |
    | --- | --- |
    | `operatorInfo` | `Aggregate`, AGGREGATE_GROUP, 1-in/1-out,
    `supportReconfiguration == false` |
    | `getPhysicalPlan` | a two-stage (`localAgg` + `globalAgg`) plan with
    one connecting link |
    | Schema propagation | group-by keys + per-aggregation result column;
    SUM keeps the input type, COUNT → INTEGER, AVERAGE → DOUBLE |
    
    **Note for reviewers:** each `AggregateOpDesc` test builds a **fresh**
    descriptor — `getPhysicalPlan` mutates `aggregations` via `getFinal`
    (e.g. COUNT → SUM) and is intentionally not idempotent.
    
    ### Any related issues, documentation, discussions?
    
    Closes #5830.
    
    ### How was this PR tested?
    
    - `sbt "WorkflowOperator/testOnly
    org.apache.texera.amber.operator.ifStatement.IfOpDescSpec
    org.apache.texera.amber.operator.dummy.DummyOpDescSpec
    org.apache.texera.amber.operator.aggregate.AggregateOpDescSpec"` — 13
    tests, all green
    - `sbt "WorkflowOperator/Test/scalafmtCheck"` and `sbt
    "WorkflowOperator/Test/scalafix --check"` — clean
    - CI to confirm
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Opus 4.8 [1M context])
---
 .../operator/aggregate/AggregateOpDescSpec.scala   | 90 ++++++++++++++++++++++
 .../amber/operator/dummy/DummyOpDescSpec.scala     | 80 +++++++++++++++++++
 .../amber/operator/ifStatement/IfOpDescSpec.scala  | 79 +++++++++++++++++++
 3 files changed, 249 insertions(+)

diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/aggregate/AggregateOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/aggregate/AggregateOpDescSpec.scala
new file mode 100644
index 0000000000..681a1aa2be
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/aggregate/AggregateOpDescSpec.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.aggregate
+
+import org.apache.texera.amber.core.tuple.{AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.PortIdentity
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class AggregateOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  private def aggOp(fn: AggregationFunction, attr: String, result: String): 
AggregationOperation = {
+    val a = new AggregationOperation()
+    a.aggFunction = fn
+    a.attribute = attr
+    a.resultAttribute = result
+    a
+  }
+
+  // Each test builds a FRESH desc: getPhysicalPlan mutates `aggregations` 
(getFinal),
+  // so the descriptor is intentionally not idempotent across calls.
+  private def descWith(keys: List[String], aggs: AggregationOperation*): 
AggregateOpDesc = {
+    val d = new AggregateOpDesc
+    d.groupByKeys = keys
+    d.aggregations = aggs.toList
+    d
+  }
+
+  "AggregateOpDesc.operatorInfo" should "advertise the name and Aggregate 
group" in {
+    val info = (new AggregateOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "Aggregate"
+    info.operatorDescription shouldBe "Calculate different types of 
aggregation values"
+    info.operatorGroupName shouldBe OperatorGroupConstants.AGGREGATE_GROUP
+    info.inputPorts should have length 1
+    info.outputPorts should have length 1
+    info.supportReconfiguration shouldBe false
+  }
+
+  "AggregateOpDesc.getPhysicalPlan" should
+    "build a two-stage (localAgg + globalAgg) plan with one connecting link" 
in {
+    val plan = descWith(List("city"), aggOp(AggregationFunction.SUM, "sales", 
"total"))
+      .getPhysicalPlan(workflowId, executionId)
+    plan.operators should have size 2
+    plan.links should have size 1
+  }
+
+  "AggregateOpDesc schema propagation" should
+    "produce the group-by keys plus the aggregation result column (SUM keeps 
the input type)" in {
+    val input = Schema().add("city", AttributeType.STRING).add("sales", 
AttributeType.INTEGER)
+    val out = descWith(List("city"), aggOp(AggregationFunction.SUM, "sales", 
"total"))
+      .getExternalOutputSchemas(Map(PortIdentity() -> input))
+    out shouldBe Map(
+      PortIdentity() -> Schema()
+        .add("city", AttributeType.STRING)
+        .add("total", AttributeType.INTEGER)
+    )
+  }
+
+  it should "type a COUNT result as INTEGER and an AVERAGE result as DOUBLE" 
in {
+    val input = Schema().add("v", AttributeType.LONG)
+    descWith(List.empty, aggOp(AggregationFunction.COUNT, "v", "cnt"))
+      .getExternalOutputSchemas(Map(PortIdentity() -> input)) shouldBe
+      Map(PortIdentity() -> Schema().add("cnt", AttributeType.INTEGER))
+    descWith(List.empty, aggOp(AggregationFunction.AVERAGE, "v", "avg"))
+      .getExternalOutputSchemas(Map(PortIdentity() -> input)) shouldBe
+      Map(PortIdentity() -> Schema().add("avg", AttributeType.DOUBLE))
+  }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/dummy/DummyOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/dummy/DummyOpDescSpec.scala
new file mode 100644
index 0000000000..6f23261b27
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/dummy/DummyOpDescSpec.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.dummy
+
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, 
PortIdentity, UnknownPartition}
+import org.apache.texera.amber.operator.PortDescription
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class DummyOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  "DummyOpDesc.operatorInfo" should
+    "advertise the Utility group and enable all dynamic / reconfiguration / 
customization flags" in {
+    val info = (new DummyOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "Dummy"
+    info.operatorDescription shouldBe "A dummy operator used as a placeholder."
+    info.operatorGroupName shouldBe OperatorGroupConstants.UTILITY_GROUP
+    info.dynamicInputPorts shouldBe true
+    info.dynamicOutputPorts shouldBe true
+    info.supportReconfiguration shouldBe true
+    info.allowPortCustomization shouldBe true
+  }
+
+  "DummyOpDesc" should
+    "derive a single default input/output port when the port lists are unset 
(null)" in {
+    val info = (new DummyOpDesc).operatorInfo
+    info.inputPorts shouldBe List(InputPort())
+    info.outputPorts shouldBe List(OutputPort())
+  }
+
+  it should "derive ports from an explicit PortDescriptor list, indexed by 
position" in {
+    val d = new DummyOpDesc
+    d.inputPorts = List(
+      PortDescription(
+        "p0",
+        "first",
+        disallowMultiInputs = false,
+        isDynamicPort = false,
+        UnknownPartition()
+      )
+    )
+    val ports = d.operatorInfo.inputPorts
+    ports should have length 1
+    ports.head.id shouldBe PortIdentity(0)
+    ports.head.displayName shouldBe "first"
+  }
+
+  "DummyOpDesc.dummyOperator" should "default to the empty string" in {
+    (new DummyOpDesc).dummyOperator shouldBe ""
+  }
+
+  "DummyOpDesc.getPhysicalOp" should
+    "be the unimplemented LogicalOp stub (throws NotImplementedError)" in {
+    intercept[NotImplementedError] {
+      (new DummyOpDesc).getPhysicalOp(workflowId, executionId)
+    }
+  }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/ifStatement/IfOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/ifStatement/IfOpDescSpec.scala
new file mode 100644
index 0000000000..49cd325d57
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/ifStatement/IfOpDescSpec.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.ifStatement
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.PortIdentity
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class IfOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  "IfOpDesc.operatorInfo" should
+    "advertise two inputs (Condition + data) and two outputs (False/True) in 
the Control group" in {
+    val info = (new IfOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "If"
+    info.operatorGroupName shouldBe OperatorGroupConstants.CONTROL_GROUP
+    info.inputPorts should have length 2
+    info.inputPorts.head.id shouldBe PortIdentity()
+    info.inputPorts.head.displayName shouldBe "Condition"
+    info.inputPorts.last.id shouldBe PortIdentity(1)
+    info.outputPorts.map(_.id) shouldBe List(PortIdentity(), PortIdentity(1))
+    info.outputPorts.head.displayName shouldBe "False"
+    info.outputPorts.last.displayName shouldBe "True"
+  }
+
+  "IfOpDesc.conditionName" should "default to null" in {
+    (new IfOpDesc).conditionName shouldBe null
+  }
+
+  "IfOpDesc.getPhysicalOp" should
+    "wire IfOpExec, be non-parallelizable, and carry the port identities" in {
+    val op = new IfOpDesc
+    op.conditionName = "ready"
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    physical.parallelizable shouldBe false
+    physical.opExecInitInfo match {
+      case OpExecWithClassName(className, descString) =>
+        className shouldBe 
"org.apache.texera.amber.operator.ifStatement.IfOpExec"
+        descString should not be empty
+      case other => fail(s"expected OpExecWithClassName, got $other")
+    }
+    physical.inputPorts.keySet shouldBe 
op.operatorInfo.inputPorts.map(_.id).toSet
+    physical.outputPorts.keySet shouldBe 
op.operatorInfo.outputPorts.map(_.id).toSet
+  }
+
+  "IfOpDesc schema propagation" should
+    "route the data input's schema (inputPorts.last) to BOTH outputs, dropping 
the condition schema" in {
+    val physical = (new IfOpDesc).getPhysicalOp(workflowId, executionId)
+    val condSchema = Schema().add(new Attribute("cond", AttributeType.BOOLEAN))
+    val dataSchema = Schema().add(new Attribute("payload", 
AttributeType.STRING))
+    val out = physical.propagateSchema.func(
+      Map(PortIdentity() -> condSchema, PortIdentity(1) -> dataSchema)
+    )
+    out shouldBe Map(PortIdentity() -> dataSchema, PortIdentity(1) -> 
dataSchema)
+  }
+}

Reply via email to