This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5400-d9676a732a03e97962fca0d7537bbf79cfd51dc4 in repository https://gitbox.apache.org/repos/asf/texera.git
commit 57640a74a333d0ba01b723704becac5d7e73238d Author: Xinyuan Lin <[email protected]> AuthorDate: Sat Jun 6 15:40:08 2026 -0700 test(workflow-core): add unit test coverage for core executor traits + factory (#5400) ### What changes were proposed in this PR? `CoreExecutorReflectionSpec` (same package) already covered the basic defaults of `OperatorExecutor` / `SourceOperatorExecutor` and the `ExecFactory.newExecFromJavaClassName` happy paths + `JavaRuntimeCompilation` failure path. This PR extends the existing spec with five additional regression pins, without duplicating any existing assertion. | Surface | Pinned (new in this PR) | | --- | --- | | `OperatorExecutor.open` / `close` | A subclass override is reachable from every call site that invokes `open` / `close` on the trait (counter-based pin; a direct no-op-stub regression that bypassed dispatch wouldn't fail the existing default-stays-no-op tests). | | `OperatorExecutor.processTupleMultiPort` | The default wrapper hands the input `port` verbatim to the subclass override (a regression that hard-coded `port = 0` would be invisible to the basic delegation test). Zero-emission and many-emission fan-out from the underlying `processTuple` both pass through with `Option.empty` port. | | `SourceOperatorExecutor.processTupleMultiPort` | The input-side path never invokes `produceTuple` (counter-based pin; an accidental drain that discarded results wouldn't fail an empty-iterator check). | | `ExecFactory.newExecFromJavaClassName` | `NoSuchMethodException` propagates when neither factory branch matches a fixture with only a `(Long)` constructor (the existing tests pin the success paths and `ClassNotFoundException`, not the "no constructor matches" branch). | The PR body's earlier claim that "this package has no unit tests today" was inaccurate; the existing spec was missed during my initial coverage check because its `Reflection` suffix doesn't match any source-class name pattern. Consolidated per Copilot review. No production code changed; this is test-only. ### Any related issues, documentation, discussions? Closes #5396 ### How was this PR tested? ``` sbt "WorkflowCore/Test/testOnly org.apache.texera.amber.core.executor.CoreExecutorReflectionSpec" # → 23 tests (18 pre-existing + 5 new), all pass sbt "WorkflowCore/Test/scalafmtCheck" # → clean ``` ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Claude Opus 4.7) --- .../core/executor/CoreExecutorReflectionSpec.scala | 86 ++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/executor/CoreExecutorReflectionSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/executor/CoreExecutorReflectionSpec.scala index 6357e336c6..3cec8b0e97 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/executor/CoreExecutorReflectionSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/executor/CoreExecutorReflectionSpec.scala @@ -51,6 +51,26 @@ class CoreExecutorReflectionSpec extends AnyFlatSpec { succeed } + "OperatorExecutor.open / close" should + "honor a subclass override (counting invocations across the lifecycle)" in { + // Pin that the defaults are real method dispatches, not direct calls + // to no-op stubs — a subclass override must be reachable from any + // call site that invokes open / close on the trait. + class CountingExec extends OperatorExecutor { + var opens = 0 + var closes = 0 + override def open(): Unit = opens += 1 + override def close(): Unit = closes += 1 + override def processTuple(t: Tuple, p: Int): Iterator[TupleLike] = Iterator.empty + } + val exec = new CountingExec + exec.open() + exec.open() + exec.close() + assert(exec.opens == 2) + assert(exec.closes == 1) + } + "OperatorExecutor.produceStateOnStart" should "default to None for any port" in { val exec = new IdentityExec assert(exec.produceStateOnStart(0).isEmpty) @@ -71,6 +91,36 @@ class CoreExecutorReflectionSpec extends AnyFlatSpec { assert(out.head._2.isEmpty) } + it should "forward the input port id to the underlying processTuple verbatim" in { + // Pin: the default wrapper must hand the same `port` to the subclass + // override, not substitute a constant. A regression that hard-codes + // port = 0 would be invisible to the basic delegation test above. + var seenPort = -1 + val exec = new OperatorExecutor { + override def processTuple(t: Tuple, p: Int): Iterator[TupleLike] = { + seenPort = p + Iterator.single(t) + } + } + exec.processTupleMultiPort(tuple(0), port = 9).toList + assert(seenPort == 9) + } + + it should "produce as many output pairs as the underlying processTuple emits (zero / many fan-out)" in { + val empty = new OperatorExecutor { + override def processTuple(t: Tuple, p: Int): Iterator[TupleLike] = Iterator.empty + } + assert(empty.processTupleMultiPort(tuple(0), 0).isEmpty) + + val fanOut = new OperatorExecutor { + override def processTuple(t: Tuple, p: Int): Iterator[TupleLike] = + Iterator(tuple(1), tuple(2), tuple(3)) + } + val outs = fanOut.processTupleMultiPort(tuple(0), 0).toList + assert(outs.size == 3) + assert(outs.forall(_._2.isEmpty), "every emitted pair must have port = None under the default") + } + "OperatorExecutor.produceStateOnFinish" should "default to None for any port" in { val exec = new IdentityExec assert(exec.produceStateOnFinish(0).isEmpty) @@ -105,6 +155,22 @@ class CoreExecutorReflectionSpec extends AnyFlatSpec { assert(exec.processTupleMultiPort(tuple(99), 0).isEmpty) } + it should "never invoke produceTuple on the input-side path" in { + // Sources only emit through onFinishMultiPort. A regression in + // processTupleMultiPort that accidentally drained produceTuple + // (discarding the output) wouldn't be caught by an empty-iterator + // check alone — pin via a counter. + var producedCalls = 0 + val src = new SourceOperatorExecutor { + override def produceTuple(): Iterator[TupleLike] = { + producedCalls += 1 + Iterator.empty + } + } + src.processTupleMultiPort(tuple(0), 0).toList + assert(producedCalls == 0, "input-side path must not call produceTuple") + } + "SourceOperatorExecutor.onFinishMultiPort" should "delegate to produceTuple with no port routing" in { val exec = new CountingSource val out = exec.onFinishMultiPort(0).toList @@ -163,6 +229,18 @@ class CoreExecutorReflectionSpec extends AnyFlatSpec { } } + it should "propagate NoSuchMethodException when no constructor matches either factory branch" in { + // A fixture with only a `(Long)` constructor — neither the (no-arg) + // branch (which also tries (Int, Int) on catch) nor the (String) + // branch (which falls back to (String, Int, Int)) matches a single + // `Long` argument. Both throws propagate as NoSuchMethodException. + assertThrows[NoSuchMethodException] { + ExecFactory.newExecFromJavaClassName( + classOf[CoreExecutorReflectionSpec.LongArgExec].getName + ) + } + } + // --------------------------------------------------------------------------- // JavaRuntimeCompilation.compileCode // @@ -226,4 +304,12 @@ private object CoreExecutorReflectionSpec { port: Int ): Iterator[org.apache.texera.amber.core.tuple.TupleLike] = Iterator.empty } + + /** Only a `(Long)` constructor — neither factory branch matches. */ + class LongArgExec(val n: Long) extends OperatorExecutor { + override def processTuple( + tuple: org.apache.texera.amber.core.tuple.Tuple, + port: Int + ): Iterator[org.apache.texera.amber.core.tuple.TupleLike] = Iterator.empty + } }
