This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 8ca6bb2aef test(workflow-operator): add unit test coverage for
row-count-shaping operator descriptors (#5814)
8ca6bb2aef is described below
commit 8ca6bb2aef8a4bace84721941a2254d176aaeb49
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sat Jun 20 00:19:16 2026 -0700
test(workflow-operator): add unit test coverage for row-count-shaping
operator descriptors (#5814)
### What changes were proposed in this PR?
Pin behavior of three previously-untested descriptors that shape/reduce
output row counts in `common/workflow-operator/`. No production-code
changes.
| Spec | Source class | Tests |
| --- | --- | --- |
| `LimitOpDescSpec` | `LimitOpDesc` | 5 |
| `RandomKSamplingOpDescSpec` | `RandomKSamplingOpDesc` | 3 |
| `ReservoirSamplingOpDescSpec` | `ReservoirSamplingOpDesc` | 3 |
All three spec files follow the `<srcClassName>Spec.scala` one-to-one
convention.
**Behavior pinned — `LimitOpDesc`**
| Surface | Contract |
| --- | --- |
| `operatorInfo` | `Limit`, `CLEANING_GROUP`, 1-in/1-out,
`supportReconfiguration == true` |
| Polymorphic deserialize | `{"operatorType":"Limit","limit":N}` via
`classOf[LogicalOp]` yields a `LimitOpDesc` with `limit == N` |
| `getPhysicalOp` | non-parallelizable; wires `LimitOpExec`; ports
carried forward |
| `runtimeReconfiguration` | returns `Success` with a
`StateTransferFunc`; the func copies the running `count` from the old
`LimitOpExec` to the new one (exercised end-to-end with two real exec
instances) |
**Behavior pinned — `RandomKSamplingOpDesc`**
| Surface | Contract |
| --- | --- |
| `operatorInfo` | `Random K Sampling`, `UTILITY_GROUP`,
`supportReconfiguration == true` |
| `percentage` round-trip | serializes under the spaced wire-key `random
k sample percentage`; survives a polymorphic round-trip |
| `getPhysicalOp` | wires `RandomKSamplingOpExec`; ports carried forward
|
**Behavior pinned — `ReservoirSamplingOpDesc`**
| Surface | Contract |
| --- | --- |
| `operatorInfo` | `Reservoir Sampling`, `UTILITY_GROUP`,
`supportReconfiguration == false` (the intentional difference vs
RandomKSampling — pinned so a future "fix" that flips it is caught) |
| `k` round-trip | serializes under the wire-key `number of item sampled
in reservoir sampling` |
| `getPhysicalOp` | wires `ReservoirSamplingOpExec`; ports carried
forward |
### Any related issues, documentation, discussions?
Closes #5807.
### How was this PR tested?
Pure unit-test additions; verified locally with:
- `sbt "WorkflowOperator/testOnly
org.apache.texera.amber.operator.limit.LimitOpDescSpec
org.apache.texera.amber.operator.randomksampling.RandomKSamplingOpDescSpec
org.apache.texera.amber.operator.reservoirsampling.ReservoirSamplingOpDescSpec"`
— 11 tests, all green
- `sbt "WorkflowOperator/Test/scalafmtCheck"` and `sbt
"WorkflowOperator/Test/scalafix --check"` — clean
- CI to confirm
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.8 [1M context])
---
.../amber/operator/limit/LimitOpDescSpec.scala | 97 ++++++++++++++++++++++
.../RandomKSamplingOpDescSpec.scala | 76 +++++++++++++++++
.../ReservoirSamplingOpDescSpec.scala | 76 +++++++++++++++++
3 files changed, 249 insertions(+)
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/limit/LimitOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/limit/LimitOpDescSpec.scala
new file mode 100644
index 0000000000..f01e8f63f0
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/limit/LimitOpDescSpec.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.limit
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+import scala.util.Success
+
+class LimitOpDescSpec extends AnyFlatSpec with Matchers {
+
+ private val workflowId = WorkflowIdentity(1L)
+ private val executionId = ExecutionIdentity(1L)
+
+ // LogicalOp carries @JsonTypeInfo(property = "operatorType"); deserialize
via the
+ // base type with the registered discriminator name "Limit".
+ private def limitDesc(n: Int): LimitOpDesc =
+ objectMapper
+ .readValue(s"""{"operatorType":"Limit","limit":$n}""",
classOf[LogicalOp])
+ .asInstanceOf[LimitOpDesc]
+
+ "LimitOpDesc.operatorInfo" should
+ "advertise the name, Cleaning group, and reconfiguration support" in {
+ val info = (new LimitOpDesc).operatorInfo
+ info.userFriendlyName shouldBe "Limit"
+ info.operatorDescription shouldBe "Limit the number of output rows"
+ info.operatorGroupName shouldBe OperatorGroupConstants.CLEANING_GROUP
+ info.inputPorts should have length 1
+ info.outputPorts should have length 1
+ info.supportReconfiguration shouldBe true
+ }
+
+ "LimitOpDesc" should "deserialize the limit field through the polymorphic
base" in {
+ limitDesc(42).limit shouldBe 42
+ }
+
+ "LimitOpDesc.getPhysicalOp" should "be non-parallelizable and wire
LimitOpExec" in {
+ val physical = limitDesc(10).getPhysicalOp(workflowId, executionId)
+ physical.parallelizable shouldBe false
+ physical.opExecInitInfo match {
+ case OpExecWithClassName(className, descString) =>
+ className shouldBe "org.apache.texera.amber.operator.limit.LimitOpExec"
+ descString should not be empty
+ case other => fail(s"expected OpExecWithClassName, got $other")
+ }
+ }
+
+ it should "carry forward the operatorInfo input/output port identities" in {
+ val op = limitDesc(10)
+ val physical = op.getPhysicalOp(workflowId, executionId)
+ // Pin the actual port identities (not just counts).
+ physical.inputPorts.keySet shouldBe
op.operatorInfo.inputPorts.map(_.id).toSet
+ physical.outputPorts.keySet shouldBe
op.operatorInfo.outputPorts.map(_.id).toSet
+ }
+
+ "LimitOpDesc.runtimeReconfiguration" should
+ "return Success with a state-transfer func that copies the running row
count" in {
+ val desc = limitDesc(5)
+ val result = desc.runtimeReconfiguration(workflowId, executionId, desc,
desc)
+ result shouldBe a[Success[_]]
+ val (_, transferOpt) = result.get
+ transferOpt should not be empty
+
+ // Exercise the state-transfer func: a freshly-created exec starts at
count 0;
+ // the func must copy the old exec's count into the new one.
+ val descJson = """{"operatorType":"Limit","limit":5}"""
+ val oldExec = new LimitOpExec(descJson)
+ oldExec.count = 3
+ val newExec = new LimitOpExec(descJson)
+ newExec.count shouldBe 0
+ val transfer = transferOpt.get
+ transfer(oldExec, newExec)
+ newExec.count shouldBe 3
+ }
+}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/randomksampling/RandomKSamplingOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/randomksampling/RandomKSamplingOpDescSpec.scala
new file mode 100644
index 0000000000..94d450a955
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/randomksampling/RandomKSamplingOpDescSpec.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.randomksampling
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class RandomKSamplingOpDescSpec extends AnyFlatSpec with Matchers {
+
+ private val workflowId = WorkflowIdentity(1L)
+ private val executionId = ExecutionIdentity(1L)
+
+ // The percentage field uses a spaced @JsonProperty wire-key.
+ private val WireKey = "random k sample percentage"
+
+ "RandomKSamplingOpDesc.operatorInfo" should
+ "advertise the name, Utility group, and reconfiguration support" in {
+ val info = (new RandomKSamplingOpDesc).operatorInfo
+ info.userFriendlyName shouldBe "Random K Sampling"
+ info.operatorDescription shouldBe "random sampling with given percentage"
+ info.operatorGroupName shouldBe OperatorGroupConstants.UTILITY_GROUP
+ info.inputPorts should have length 1
+ info.outputPorts should have length 1
+ info.supportReconfiguration shouldBe true
+ }
+
+ "RandomKSamplingOpDesc" should
+ "serialize percentage under its spaced wire-key and round-trip it" in {
+ val d = new RandomKSamplingOpDesc
+ d.percentage = 25
+ val json = objectMapper.writeValueAsString(d)
+ val tree = objectMapper.readTree(json)
+ tree.has(WireKey) shouldBe true
+ tree.get(WireKey).asInt shouldBe 25
+ val restored = objectMapper.readValue(json, classOf[LogicalOp])
+ restored shouldBe a[RandomKSamplingOpDesc]
+ restored.asInstanceOf[RandomKSamplingOpDesc].percentage shouldBe 25
+ }
+
+ "RandomKSamplingOpDesc.getPhysicalOp" should
+ "wire the RandomKSamplingOpExec class name and carry ports" in {
+ val d = new RandomKSamplingOpDesc
+ d.percentage = 50
+ val physical = d.getPhysicalOp(workflowId, executionId)
+ physical.opExecInitInfo match {
+ case OpExecWithClassName(className, descString) =>
+ className shouldBe
"org.apache.texera.amber.operator.randomksampling.RandomKSamplingOpExec"
+ descString should not be empty
+ case other => fail(s"expected OpExecWithClassName, got $other")
+ }
+ physical.inputPorts.keySet shouldBe
d.operatorInfo.inputPorts.map(_.id).toSet
+ physical.outputPorts.keySet shouldBe
d.operatorInfo.outputPorts.map(_.id).toSet
+ }
+}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpDescSpec.scala
new file mode 100644
index 0000000000..0c903d6ce8
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpDescSpec.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.reservoirsampling
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class ReservoirSamplingOpDescSpec extends AnyFlatSpec with Matchers {
+
+ private val workflowId = WorkflowIdentity(1L)
+ private val executionId = ExecutionIdentity(1L)
+
+ private val WireKey = "number of item sampled in reservoir sampling"
+
+ "ReservoirSamplingOpDesc.operatorInfo" should
+ "advertise the name, Utility group, and (intentionally) NOT support
reconfiguration" in {
+ val info = (new ReservoirSamplingOpDesc).operatorInfo
+ info.userFriendlyName shouldBe "Reservoir Sampling"
+ info.operatorDescription shouldBe "Reservoir Sampling with k items being
kept randomly"
+ info.operatorGroupName shouldBe OperatorGroupConstants.UTILITY_GROUP
+ info.inputPorts should have length 1
+ info.outputPorts should have length 1
+ // ReservoirSampling does not opt into reconfiguration (unlike
RandomKSampling),
+ // so it inherits the OperatorInfo default of false.
+ info.supportReconfiguration shouldBe false
+ }
+
+ "ReservoirSamplingOpDesc" should "serialize k under its wire-key and
round-trip it" in {
+ val d = new ReservoirSamplingOpDesc
+ d.k = 100
+ val json = objectMapper.writeValueAsString(d)
+ val tree = objectMapper.readTree(json)
+ tree.has(WireKey) shouldBe true
+ tree.get(WireKey).asInt shouldBe 100
+ val restored = objectMapper.readValue(json, classOf[LogicalOp])
+ restored shouldBe a[ReservoirSamplingOpDesc]
+ restored.asInstanceOf[ReservoirSamplingOpDesc].k shouldBe 100
+ }
+
+ "ReservoirSamplingOpDesc.getPhysicalOp" should
+ "wire the ReservoirSamplingOpExec class name and carry ports" in {
+ val d = new ReservoirSamplingOpDesc
+ d.k = 10
+ val physical = d.getPhysicalOp(workflowId, executionId)
+ physical.opExecInitInfo match {
+ case OpExecWithClassName(className, descString) =>
+ className shouldBe
"org.apache.texera.amber.operator.reservoirsampling.ReservoirSamplingOpExec"
+ descString should not be empty
+ case other => fail(s"expected OpExecWithClassName, got $other")
+ }
+ physical.inputPorts.keySet shouldBe
d.operatorInfo.inputPorts.map(_.id).toSet
+ physical.outputPorts.keySet shouldBe
d.operatorInfo.outputPorts.map(_.id).toSet
+ }
+}