This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 57640a74a3 test(workflow-core): add unit test coverage for core
executor traits + factory (#5400)
57640a74a3 is described below
commit 57640a74a333d0ba01b723704becac5d7e73238d
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sat Jun 6 15:40:08 2026 -0700
test(workflow-core): add unit test coverage for core executor traits +
factory (#5400)
### What changes were proposed in this PR?
`CoreExecutorReflectionSpec` (same package) already covered the basic
defaults of `OperatorExecutor` / `SourceOperatorExecutor` and the
`ExecFactory.newExecFromJavaClassName` happy paths +
`JavaRuntimeCompilation` failure path. This PR extends the existing spec
with five additional regression pins, without duplicating any existing
assertion.
| Surface | Pinned (new in this PR) |
| --- | --- |
| `OperatorExecutor.open` / `close` | A subclass override is reachable
from every call site that invokes `open` / `close` on the trait
(counter-based pin; a direct no-op-stub regression that bypassed
dispatch wouldn't fail the existing default-stays-no-op tests). |
| `OperatorExecutor.processTupleMultiPort` | The default wrapper hands
the input `port` verbatim to the subclass override (a regression that
hard-coded `port = 0` would be invisible to the basic delegation test).
Zero-emission and many-emission fan-out from the underlying
`processTuple` both pass through with `Option.empty` port. |
| `SourceOperatorExecutor.processTupleMultiPort` | The input-side path
never invokes `produceTuple` (counter-based pin; an accidental drain
that discarded results wouldn't fail an empty-iterator check). |
| `ExecFactory.newExecFromJavaClassName` | `NoSuchMethodException`
propagates when neither factory branch matches a fixture with only a
`(Long)` constructor (the existing tests pin the success paths and
`ClassNotFoundException`, not the "no constructor matches" branch). |
The PR body's earlier claim that "this package has no unit tests today"
was inaccurate; the existing spec was missed during my initial coverage
check because its `Reflection` suffix doesn't match any source-class
name pattern. Consolidated per Copilot review.
No production code changed; this is test-only.
### Any related issues, documentation, discussions?
Closes #5396
### How was this PR tested?
```
sbt "WorkflowCore/Test/testOnly
org.apache.texera.amber.core.executor.CoreExecutorReflectionSpec"
# → 23 tests (18 pre-existing + 5 new), all pass
sbt "WorkflowCore/Test/scalafmtCheck"
# → clean
```
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.7)
---
.../core/executor/CoreExecutorReflectionSpec.scala | 86 ++++++++++++++++++++++
1 file changed, 86 insertions(+)
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/executor/CoreExecutorReflectionSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/executor/CoreExecutorReflectionSpec.scala
index 6357e336c6..3cec8b0e97 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/executor/CoreExecutorReflectionSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/executor/CoreExecutorReflectionSpec.scala
@@ -51,6 +51,26 @@ class CoreExecutorReflectionSpec extends AnyFlatSpec {
succeed
}
+ "OperatorExecutor.open / close" should
+ "honor a subclass override (counting invocations across the lifecycle)" in
{
+ // Pin that the defaults are real method dispatches, not direct calls
+ // to no-op stubs — a subclass override must be reachable from any
+ // call site that invokes open / close on the trait.
+ class CountingExec extends OperatorExecutor {
+ var opens = 0
+ var closes = 0
+ override def open(): Unit = opens += 1
+ override def close(): Unit = closes += 1
+ override def processTuple(t: Tuple, p: Int): Iterator[TupleLike] =
Iterator.empty
+ }
+ val exec = new CountingExec
+ exec.open()
+ exec.open()
+ exec.close()
+ assert(exec.opens == 2)
+ assert(exec.closes == 1)
+ }
+
"OperatorExecutor.produceStateOnStart" should "default to None for any port"
in {
val exec = new IdentityExec
assert(exec.produceStateOnStart(0).isEmpty)
@@ -71,6 +91,36 @@ class CoreExecutorReflectionSpec extends AnyFlatSpec {
assert(out.head._2.isEmpty)
}
+ it should "forward the input port id to the underlying processTuple
verbatim" in {
+ // Pin: the default wrapper must hand the same `port` to the subclass
+ // override, not substitute a constant. A regression that hard-codes
+ // port = 0 would be invisible to the basic delegation test above.
+ var seenPort = -1
+ val exec = new OperatorExecutor {
+ override def processTuple(t: Tuple, p: Int): Iterator[TupleLike] = {
+ seenPort = p
+ Iterator.single(t)
+ }
+ }
+ exec.processTupleMultiPort(tuple(0), port = 9).toList
+ assert(seenPort == 9)
+ }
+
+ it should "produce as many output pairs as the underlying processTuple emits
(zero / many fan-out)" in {
+ val empty = new OperatorExecutor {
+ override def processTuple(t: Tuple, p: Int): Iterator[TupleLike] =
Iterator.empty
+ }
+ assert(empty.processTupleMultiPort(tuple(0), 0).isEmpty)
+
+ val fanOut = new OperatorExecutor {
+ override def processTuple(t: Tuple, p: Int): Iterator[TupleLike] =
+ Iterator(tuple(1), tuple(2), tuple(3))
+ }
+ val outs = fanOut.processTupleMultiPort(tuple(0), 0).toList
+ assert(outs.size == 3)
+ assert(outs.forall(_._2.isEmpty), "every emitted pair must have port =
None under the default")
+ }
+
"OperatorExecutor.produceStateOnFinish" should "default to None for any
port" in {
val exec = new IdentityExec
assert(exec.produceStateOnFinish(0).isEmpty)
@@ -105,6 +155,22 @@ class CoreExecutorReflectionSpec extends AnyFlatSpec {
assert(exec.processTupleMultiPort(tuple(99), 0).isEmpty)
}
+ it should "never invoke produceTuple on the input-side path" in {
+ // Sources only emit through onFinishMultiPort. A regression in
+ // processTupleMultiPort that accidentally drained produceTuple
+ // (discarding the output) wouldn't be caught by an empty-iterator
+ // check alone — pin via a counter.
+ var producedCalls = 0
+ val src = new SourceOperatorExecutor {
+ override def produceTuple(): Iterator[TupleLike] = {
+ producedCalls += 1
+ Iterator.empty
+ }
+ }
+ src.processTupleMultiPort(tuple(0), 0).toList
+ assert(producedCalls == 0, "input-side path must not call produceTuple")
+ }
+
"SourceOperatorExecutor.onFinishMultiPort" should "delegate to produceTuple
with no port routing" in {
val exec = new CountingSource
val out = exec.onFinishMultiPort(0).toList
@@ -163,6 +229,18 @@ class CoreExecutorReflectionSpec extends AnyFlatSpec {
}
}
+ it should "propagate NoSuchMethodException when no constructor matches
either factory branch" in {
+ // A fixture with only a `(Long)` constructor — neither the (no-arg)
+ // branch (which also tries (Int, Int) on catch) nor the (String)
+ // branch (which falls back to (String, Int, Int)) matches a single
+ // `Long` argument. Both throws propagate as NoSuchMethodException.
+ assertThrows[NoSuchMethodException] {
+ ExecFactory.newExecFromJavaClassName(
+ classOf[CoreExecutorReflectionSpec.LongArgExec].getName
+ )
+ }
+ }
+
//
---------------------------------------------------------------------------
// JavaRuntimeCompilation.compileCode
//
@@ -226,4 +304,12 @@ private object CoreExecutorReflectionSpec {
port: Int
): Iterator[org.apache.texera.amber.core.tuple.TupleLike] = Iterator.empty
}
+
+ /** Only a `(Long)` constructor — neither factory branch matches. */
+ class LongArgExec(val n: Long) extends OperatorExecutor {
+ override def processTuple(
+ tuple: org.apache.texera.amber.core.tuple.Tuple,
+ port: Int
+ ): Iterator[org.apache.texera.amber.core.tuple.TupleLike] = Iterator.empty
+ }
}