This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5814-41123c0ad1ddfd3a805a562511d27660e4b14b73 in repository https://gitbox.apache.org/repos/asf/texera.git
commit e2e87d604edfef8b18d552dbf3652302f1833136 Author: Xinyuan Lin <[email protected]> AuthorDate: Fri Jun 19 23:59:07 2026 -0700 test(workflow-operator): add unit test coverage for row-count-shaping operator descriptors (#5814) ### What changes were proposed in this PR? Pin behavior of three previously-untested descriptors that shape/reduce output row counts in `common/workflow-operator/`. No production-code changes. | Spec | Source class | Tests | | --- | --- | --- | | `LimitOpDescSpec` | `LimitOpDesc` | 5 | | `RandomKSamplingOpDescSpec` | `RandomKSamplingOpDesc` | 3 | | `ReservoirSamplingOpDescSpec` | `ReservoirSamplingOpDesc` | 3 | All three spec files follow the `<srcClassName>Spec.scala` one-to-one convention. **Behavior pinned — `LimitOpDesc`** | Surface | Contract | | --- | --- | | `operatorInfo` | `Limit`, `CLEANING_GROUP`, 1-in/1-out, `supportReconfiguration == true` | | Polymorphic deserialize | `{"operatorType":"Limit","limit":N}` via `classOf[LogicalOp]` yields a `LimitOpDesc` with `limit == N` | | `getPhysicalOp` | non-parallelizable; wires `LimitOpExec`; ports carried forward | | `runtimeReconfiguration` | returns `Success` with a `StateTransferFunc`; the func copies the running `count` from the old `LimitOpExec` to the new one (exercised end-to-end with two real exec instances) | **Behavior pinned — `RandomKSamplingOpDesc`** | Surface | Contract | | --- | --- | | `operatorInfo` | `Random K Sampling`, `UTILITY_GROUP`, `supportReconfiguration == true` | | `percentage` round-trip | serializes under the spaced wire-key `random k sample percentage`; survives a polymorphic round-trip | | `getPhysicalOp` | wires `RandomKSamplingOpExec`; ports carried forward | **Behavior pinned — `ReservoirSamplingOpDesc`** | Surface | Contract | | --- | --- | | `operatorInfo` | `Reservoir Sampling`, `UTILITY_GROUP`, `supportReconfiguration == false` (the intentional difference vs RandomKSampling — pinned so a future "fix" that flips it is caught) | | `k` round-trip | serializes under the wire-key `number of item sampled in reservoir sampling` | | `getPhysicalOp` | wires `ReservoirSamplingOpExec`; ports carried forward | ### Any related issues, documentation, discussions? Closes #5807. ### How was this PR tested? Pure unit-test additions; verified locally with: - `sbt "WorkflowOperator/testOnly org.apache.texera.amber.operator.limit.LimitOpDescSpec org.apache.texera.amber.operator.randomksampling.RandomKSamplingOpDescSpec org.apache.texera.amber.operator.reservoirsampling.ReservoirSamplingOpDescSpec"` — 11 tests, all green - `sbt "WorkflowOperator/Test/scalafmtCheck"` and `sbt "WorkflowOperator/Test/scalafix --check"` — clean - CI to confirm ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.8 [1M context]) --- .../amber/operator/limit/LimitOpDescSpec.scala | 97 ++++++++++++++++++++++ .../RandomKSamplingOpDescSpec.scala | 76 +++++++++++++++++ .../ReservoirSamplingOpDescSpec.scala | 76 +++++++++++++++++ 3 files changed, 249 insertions(+) diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/limit/LimitOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/limit/LimitOpDescSpec.scala new file mode 100644 index 0000000000..f01e8f63f0 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/limit/LimitOpDescSpec.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.limit + +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import scala.util.Success + +class LimitOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + // LogicalOp carries @JsonTypeInfo(property = "operatorType"); deserialize via the + // base type with the registered discriminator name "Limit". + private def limitDesc(n: Int): LimitOpDesc = + objectMapper + .readValue(s"""{"operatorType":"Limit","limit":$n}""", classOf[LogicalOp]) + .asInstanceOf[LimitOpDesc] + + "LimitOpDesc.operatorInfo" should + "advertise the name, Cleaning group, and reconfiguration support" in { + val info = (new LimitOpDesc).operatorInfo + info.userFriendlyName shouldBe "Limit" + info.operatorDescription shouldBe "Limit the number of output rows" + info.operatorGroupName shouldBe OperatorGroupConstants.CLEANING_GROUP + info.inputPorts should have length 1 + info.outputPorts should have length 1 + info.supportReconfiguration shouldBe true + } + + "LimitOpDesc" should "deserialize the limit field through the polymorphic base" in { + limitDesc(42).limit shouldBe 42 + } + + "LimitOpDesc.getPhysicalOp" should "be non-parallelizable and wire LimitOpExec" in { + val physical = limitDesc(10).getPhysicalOp(workflowId, executionId) + physical.parallelizable shouldBe false + physical.opExecInitInfo match { + case OpExecWithClassName(className, descString) => + className shouldBe "org.apache.texera.amber.operator.limit.LimitOpExec" + descString should not be empty + case other => fail(s"expected OpExecWithClassName, got $other") + } + } + + it should "carry forward the operatorInfo input/output port identities" in { + val op = limitDesc(10) + val physical = op.getPhysicalOp(workflowId, executionId) + // Pin the actual port identities (not just counts). + physical.inputPorts.keySet shouldBe op.operatorInfo.inputPorts.map(_.id).toSet + physical.outputPorts.keySet shouldBe op.operatorInfo.outputPorts.map(_.id).toSet + } + + "LimitOpDesc.runtimeReconfiguration" should + "return Success with a state-transfer func that copies the running row count" in { + val desc = limitDesc(5) + val result = desc.runtimeReconfiguration(workflowId, executionId, desc, desc) + result shouldBe a[Success[_]] + val (_, transferOpt) = result.get + transferOpt should not be empty + + // Exercise the state-transfer func: a freshly-created exec starts at count 0; + // the func must copy the old exec's count into the new one. + val descJson = """{"operatorType":"Limit","limit":5}""" + val oldExec = new LimitOpExec(descJson) + oldExec.count = 3 + val newExec = new LimitOpExec(descJson) + newExec.count shouldBe 0 + val transfer = transferOpt.get + transfer(oldExec, newExec) + newExec.count shouldBe 3 + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/randomksampling/RandomKSamplingOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/randomksampling/RandomKSamplingOpDescSpec.scala new file mode 100644 index 0000000000..94d450a955 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/randomksampling/RandomKSamplingOpDescSpec.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.randomksampling + +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class RandomKSamplingOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + // The percentage field uses a spaced @JsonProperty wire-key. + private val WireKey = "random k sample percentage" + + "RandomKSamplingOpDesc.operatorInfo" should + "advertise the name, Utility group, and reconfiguration support" in { + val info = (new RandomKSamplingOpDesc).operatorInfo + info.userFriendlyName shouldBe "Random K Sampling" + info.operatorDescription shouldBe "random sampling with given percentage" + info.operatorGroupName shouldBe OperatorGroupConstants.UTILITY_GROUP + info.inputPorts should have length 1 + info.outputPorts should have length 1 + info.supportReconfiguration shouldBe true + } + + "RandomKSamplingOpDesc" should + "serialize percentage under its spaced wire-key and round-trip it" in { + val d = new RandomKSamplingOpDesc + d.percentage = 25 + val json = objectMapper.writeValueAsString(d) + val tree = objectMapper.readTree(json) + tree.has(WireKey) shouldBe true + tree.get(WireKey).asInt shouldBe 25 + val restored = objectMapper.readValue(json, classOf[LogicalOp]) + restored shouldBe a[RandomKSamplingOpDesc] + restored.asInstanceOf[RandomKSamplingOpDesc].percentage shouldBe 25 + } + + "RandomKSamplingOpDesc.getPhysicalOp" should + "wire the RandomKSamplingOpExec class name and carry ports" in { + val d = new RandomKSamplingOpDesc + d.percentage = 50 + val physical = d.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithClassName(className, descString) => + className shouldBe "org.apache.texera.amber.operator.randomksampling.RandomKSamplingOpExec" + descString should not be empty + case other => fail(s"expected OpExecWithClassName, got $other") + } + physical.inputPorts.keySet shouldBe d.operatorInfo.inputPorts.map(_.id).toSet + physical.outputPorts.keySet shouldBe d.operatorInfo.outputPorts.map(_.id).toSet + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpDescSpec.scala new file mode 100644 index 0000000000..0c903d6ce8 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpDescSpec.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.reservoirsampling + +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class ReservoirSamplingOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + private val WireKey = "number of item sampled in reservoir sampling" + + "ReservoirSamplingOpDesc.operatorInfo" should + "advertise the name, Utility group, and (intentionally) NOT support reconfiguration" in { + val info = (new ReservoirSamplingOpDesc).operatorInfo + info.userFriendlyName shouldBe "Reservoir Sampling" + info.operatorDescription shouldBe "Reservoir Sampling with k items being kept randomly" + info.operatorGroupName shouldBe OperatorGroupConstants.UTILITY_GROUP + info.inputPorts should have length 1 + info.outputPorts should have length 1 + // ReservoirSampling does not opt into reconfiguration (unlike RandomKSampling), + // so it inherits the OperatorInfo default of false. + info.supportReconfiguration shouldBe false + } + + "ReservoirSamplingOpDesc" should "serialize k under its wire-key and round-trip it" in { + val d = new ReservoirSamplingOpDesc + d.k = 100 + val json = objectMapper.writeValueAsString(d) + val tree = objectMapper.readTree(json) + tree.has(WireKey) shouldBe true + tree.get(WireKey).asInt shouldBe 100 + val restored = objectMapper.readValue(json, classOf[LogicalOp]) + restored shouldBe a[ReservoirSamplingOpDesc] + restored.asInstanceOf[ReservoirSamplingOpDesc].k shouldBe 100 + } + + "ReservoirSamplingOpDesc.getPhysicalOp" should + "wire the ReservoirSamplingOpExec class name and carry ports" in { + val d = new ReservoirSamplingOpDesc + d.k = 10 + val physical = d.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithClassName(className, descString) => + className shouldBe "org.apache.texera.amber.operator.reservoirsampling.ReservoirSamplingOpExec" + descString should not be empty + case other => fail(s"expected OpExecWithClassName, got $other") + } + physical.inputPorts.keySet shouldBe d.operatorInfo.inputPorts.map(_.id).toSet + physical.outputPorts.keySet shouldBe d.operatorInfo.outputPorts.map(_.id).toSet + } +}
