This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5796-c8a4cd9f3a8ee5a832929767e8e720195af918b7 in repository https://gitbox.apache.org/repos/asf/texera.git
commit f5cf00a8b1b7a91d18b2505b4b3de3dfcfdcbdd2 Author: Xinyuan Lin <[email protected]> AuthorDate: Fri Jun 19 22:56:58 2026 -0700 test(workflow-operator): add unit test coverage for map/filter/flatmap abstract LogicalOp bases (#5796) ### What changes were proposed in this PR? Pin behavior of three tiny abstract `LogicalOp` bases in `common/workflow-operator/`. Two of them (`MapOpDesc`, `FilterOpDesc`) carry a `runtimeReconfiguration` override that delegates to `newOpDesc.getPhysicalOp` and wraps the result in `Success((_, None))` — drift in that delegation (swapped arg order, accidental `Failure` wrap, surprise `StateTransferFunc`) would land silently. No production-code changes. | Spec | Source class | Tests | | --- | --- | --- | | `FlatMapOpDescSpec` | `FlatMapOpDesc` | 4 | | `MapOpDescSpec` | `MapOpDesc` | 6 | | `FilterOpDescSpec` | `FilterOpDesc` | 6 | All three spec files follow the `<srcClassName>Spec.scala` one-to-one convention. **Behavior pinned (per descriptor)** | Surface | Contract | | --- | --- | | Abstract-class shape | `Modifier.isAbstract(classOf[X])` | | LogicalOp inheritance | upcast compiles; `case _: LogicalOp` matches a concrete subclass | | `runtimeReconfiguration` (Map/Filter only) | delegates to `newOpDesc.getPhysicalOp(workflowId, executionId)` | | Argument isolation | `oldOpDesc.getPhysicalOp` is **not** called | | Result wrapping | returns `Success` with `(_, None)` for the `StateTransferFunc` slot | | Exception propagation | a throw from `newOpDesc.getPhysicalOp` is **not** caught — it propagates up | The specs use minimal `StubMapDesc` / `StubFilterDesc` subclasses that record every `getPhysicalOp` call so the delegation is observable end-to-end. Sentinel returns use `null.asInstanceOf[PhysicalOp]` — the production code only passes the return value to `Success(...)` without inspecting it, so the cast is safe for these tests. ### Any related issues, documentation, discussions? Closes #5793. ### How was this PR tested? Pure unit-test additions; verified locally with: - `sbt \"WorkflowOperator/testOnly org.apache.texera.amber.operator.flatmap.FlatMapOpDescSpec org.apache.texera.amber.operator.map.MapOpDescSpec org.apache.texera.amber.operator.filter.FilterOpDescSpec\"` — 16 tests, all green - `sbt \"WorkflowOperator/Test/scalafmtCheck\"` — clean - CI to confirm ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.7 [1M context]) --- .../amber/operator/filter/FilterOpDescSpec.scala | 116 ++++++++++++++++++++ .../amber/operator/flatmap/FlatMapOpDescSpec.scala | 78 ++++++++++++++ .../texera/amber/operator/map/MapOpDescSpec.scala | 120 +++++++++++++++++++++ 3 files changed, 314 insertions(+) diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/filter/FilterOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/filter/FilterOpDescSpec.scala new file mode 100644 index 0000000000..5200eb3490 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/filter/FilterOpDescSpec.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.filter + +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.PhysicalOp +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} +import java.lang.reflect.Modifier +import org.scalatest.flatspec.AnyFlatSpec + +class FilterOpDescSpec extends AnyFlatSpec { + + // --------------------------------------------------------------------------- + // Test-only concrete subclass — same shape as the MapOpDesc stub; the + // two abstract bases share the same `runtimeReconfiguration` contract. + // --------------------------------------------------------------------------- + + private class StubFilterDesc extends FilterOpDesc { + var calls: List[(WorkflowIdentity, ExecutionIdentity)] = Nil + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + calls = calls :+ ((workflowId, executionId)) + null.asInstanceOf[PhysicalOp] + } + override def operatorInfo: OperatorInfo = + OperatorInfo( + "StubFilter", + "stub filter", + OperatorGroupConstants.UTILITY_GROUP, + inputPorts = List.empty, + outputPorts = List.empty + ) + } + + private val workflowId = WorkflowIdentity(7L) + private val executionId = ExecutionIdentity(11L) + + // --------------------------------------------------------------------------- + // Abstract-class shape + // --------------------------------------------------------------------------- + + "FilterOpDesc" should "be declared abstract (cannot be instantiated directly)" in { + assert(Modifier.isAbstract(classOf[FilterOpDesc].getModifiers)) + } + + // --------------------------------------------------------------------------- + // Inheritance — FilterOpDesc is a LogicalOp + // --------------------------------------------------------------------------- + + it should "extend LogicalOp (compile-time enforced)" in { + val s: LogicalOp = new StubFilterDesc + assert(s != null) + } + + // --------------------------------------------------------------------------- + // runtimeReconfiguration — delegates to newOpDesc and ignores oldOpDesc + // --------------------------------------------------------------------------- + + "FilterOpDesc.runtimeReconfiguration" should + "delegate to newOpDesc.getPhysicalOp with the supplied workflow/execution ids" in { + val oldDesc = new StubFilterDesc + val newDesc = new StubFilterDesc + val result = oldDesc.runtimeReconfiguration(workflowId, executionId, oldDesc, newDesc) + assert(result.isSuccess) + assert(newDesc.calls == List((workflowId, executionId))) + } + + it should "not call oldOpDesc.getPhysicalOp" in { + val oldDesc = new StubFilterDesc + val newDesc = new StubFilterDesc + oldDesc.runtimeReconfiguration(workflowId, executionId, oldDesc, newDesc) + assert(oldDesc.calls == Nil) + } + + it should "return None for the StateTransferFunc slot" in { + val oldDesc = new StubFilterDesc + val newDesc = new StubFilterDesc + val result = oldDesc.runtimeReconfiguration(workflowId, executionId, oldDesc, newDesc) + val (_, transferOpt) = result.get + assert(transferOpt.isEmpty) + } + + it should "propagate exceptions from newOpDesc.getPhysicalOp (not catch them)" in { + val oldDesc = new StubFilterDesc + val throwingDesc = new StubFilterDesc { + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = throw new RuntimeException("sentinel:newDesc") + } + val ex = intercept[RuntimeException] { + oldDesc.runtimeReconfiguration(workflowId, executionId, oldDesc, throwingDesc) + } + assert(ex.getMessage == "sentinel:newDesc") + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/flatmap/FlatMapOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/flatmap/FlatMapOpDescSpec.scala new file mode 100644 index 0000000000..937c7a4c64 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/flatmap/FlatMapOpDescSpec.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.flatmap + +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} +import java.lang.reflect.Modifier +import org.scalatest.flatspec.AnyFlatSpec + +class FlatMapOpDescSpec extends AnyFlatSpec { + + // --------------------------------------------------------------------------- + // Test-only concrete subclass + // --------------------------------------------------------------------------- + + private class StubFlatMap extends FlatMapOpDesc { + override def operatorInfo: OperatorInfo = + OperatorInfo( + "StubFlatMap", + "stub flatmap", + OperatorGroupConstants.UTILITY_GROUP, + inputPorts = List.empty, + outputPorts = List.empty + ) + } + + // --------------------------------------------------------------------------- + // Abstract-class shape + // --------------------------------------------------------------------------- + + "FlatMapOpDesc" should "be declared abstract (cannot be instantiated directly)" in { + assert(Modifier.isAbstract(classOf[FlatMapOpDesc].getModifiers)) + } + + // --------------------------------------------------------------------------- + // Inheritance — FlatMapOpDesc is a LogicalOp + // --------------------------------------------------------------------------- + + it should "extend LogicalOp (compile-time enforced)" in { + val s: LogicalOp = new StubFlatMap + assert(s != null) + } + + it should "match the LogicalOp type-pattern" in { + val any: AnyRef = new StubFlatMap + val matched = any match { + case _: LogicalOp => true + case _ => false + } + assert(matched) + } + + it should "match the FlatMapOpDesc type-pattern" in { + val any: AnyRef = new StubFlatMap + val matched = any match { + case _: FlatMapOpDesc => true + case _ => false + } + assert(matched) + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/map/MapOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/map/MapOpDescSpec.scala new file mode 100644 index 0000000000..aa61154ca8 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/map/MapOpDescSpec.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.map + +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.PhysicalOp +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} +import java.lang.reflect.Modifier +import org.scalatest.flatspec.AnyFlatSpec + +class MapOpDescSpec extends AnyFlatSpec { + + // --------------------------------------------------------------------------- + // Test-only concrete subclass — records every call to `getPhysicalOp` + // and returns a stable sentinel so the spec can verify the delegation. + // + // Casting `null` to PhysicalOp is acceptable here: the production + // `runtimeReconfiguration` just wraps the return value in + // `Success((_, None))` without inspecting it. + // --------------------------------------------------------------------------- + + private class StubMapDesc extends MapOpDesc { + var calls: List[(WorkflowIdentity, ExecutionIdentity)] = Nil + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + calls = calls :+ ((workflowId, executionId)) + null.asInstanceOf[PhysicalOp] + } + override def operatorInfo: OperatorInfo = + OperatorInfo( + "StubMap", + "stub map", + OperatorGroupConstants.UTILITY_GROUP, + inputPorts = List.empty, + outputPorts = List.empty + ) + } + + private val workflowId = WorkflowIdentity(7L) + private val executionId = ExecutionIdentity(11L) + + // --------------------------------------------------------------------------- + // Abstract-class shape + // --------------------------------------------------------------------------- + + "MapOpDesc" should "be declared abstract (cannot be instantiated directly)" in { + assert(Modifier.isAbstract(classOf[MapOpDesc].getModifiers)) + } + + // --------------------------------------------------------------------------- + // Inheritance — MapOpDesc is a LogicalOp + // --------------------------------------------------------------------------- + + it should "extend LogicalOp (compile-time enforced)" in { + val s: LogicalOp = new StubMapDesc + assert(s != null) + } + + // --------------------------------------------------------------------------- + // runtimeReconfiguration — delegates to newOpDesc and ignores oldOpDesc + // --------------------------------------------------------------------------- + + "MapOpDesc.runtimeReconfiguration" should + "delegate to newOpDesc.getPhysicalOp with the supplied workflow/execution ids" in { + val oldDesc = new StubMapDesc + val newDesc = new StubMapDesc + val result = oldDesc.runtimeReconfiguration(workflowId, executionId, oldDesc, newDesc) + assert(result.isSuccess) + assert(newDesc.calls == List((workflowId, executionId))) + } + + it should "not call oldOpDesc.getPhysicalOp" in { + val oldDesc = new StubMapDesc + val newDesc = new StubMapDesc + oldDesc.runtimeReconfiguration(workflowId, executionId, oldDesc, newDesc) + assert(oldDesc.calls == Nil) + } + + it should "return None for the StateTransferFunc slot" in { + val oldDesc = new StubMapDesc + val newDesc = new StubMapDesc + val result = oldDesc.runtimeReconfiguration(workflowId, executionId, oldDesc, newDesc) + val (_, transferOpt) = result.get + assert(transferOpt.isEmpty) + } + + it should "propagate exceptions from newOpDesc.getPhysicalOp (not catch them)" in { + val oldDesc = new StubMapDesc + val throwingDesc = new StubMapDesc { + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = throw new RuntimeException("sentinel:newDesc") + } + val ex = intercept[RuntimeException] { + oldDesc.runtimeReconfiguration(workflowId, executionId, oldDesc, throwingDesc) + } + assert(ex.getMessage == "sentinel:newDesc") + } +}
