This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5606-227cbd73960afbcaa734b30f3ac108dc669324f3 in repository https://gitbox.apache.org/repos/asf/texera.git
commit 0d8f9a7b8a210447364c43cbfe8aa3fe0d1f8cd5 Author: Suyash Jain <[email protected]> AuthorDate: Fri Jun 12 13:05:06 2026 -0700 fix(workflow-operator): no null padding in reservoir sampling (#5606) ### What changes were proposed in this PR? `ReservoirSamplingOpExec` allocates a fixed-size reservoir of length `count` (the per-worker share of `k`). When a worker receives fewer tuples than `count`, only the first `n` slots are filled, but `onFinish` returned the whole array, yielding `count - n` trailing `null` entries. The nulls are currently swallowed by a distant null-guard in `DataProcessor`, so the bug is latent — but the operator violates the "do not emit null tuples" contract and breaks if that guard is ever narrowed or bypassed. ``` Before: input < k -> onFinish emits [t0 .. tn-1, null, ..., null] (engine guard hides them) After: input < k -> onFinish emits [t0 .. tn-1] (no nulls emitted at all) ``` The fix emits only the filled prefix: ```scala override def onFinish(port: Int): Iterator[TupleLike] = reservoir.iterator.take(n) ``` `take(n)` is a no-op when `n >= count` (input ≥ k), so the sampled output is unchanged in the normal case. ### Any related issues, documentation, discussions? Closes #5592 ### How was this PR tested? Added three regression cases to `ReservoirSamplingOpExecSpec`: | Case | Asserts | | --- | --- | | `input size < k` | only the received tuples are emitted, in order, no nulls | | empty input | `onFinish` emits nothing | | skewed partitioning (`k=10`, 3 workers, worker 0 gets 2 tuples) | no null padding for an under-filled worker share | All three fail against the old `reservoir.iterator` and pass with `reservoir.iterator.take(n)`; the 9 pre-existing cases stay green (TDD red → green verified by stashing the source fix). ``` sbt "WorkflowOperator/testOnly org.apache.texera.amber.operator.reservoirsampling.ReservoirSamplingOpExecSpec" # Tests: succeeded 12, failed 0, canceled 0, ignored 0, pending 0 ``` `sbt WorkflowOperator/scalafixAll` and `sbt WorkflowOperator/scalafmtAll` produce no further diff. ### Was this PR authored or co-authored using generative AI tooling? Yes, partially. I (Suyash Jain) worked on this PR together with Claude Code as a pair-programming assistant. I reviewed the final diff, ran the spec locally, and verified the red → green behavior of the new regression tests myself before opening the PR. Generated-by: Claude Code (Claude Opus 4.7) Co-authored-by: Xuan Gu <[email protected]> --- .../reservoirsampling/ReservoirSamplingOpExec.scala | 4 +++- .../ReservoirSamplingOpExecSpec.scala | 21 +++++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExec.scala index 155b0c99b2..3ec8d017c8 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExec.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExec.scala @@ -58,6 +58,8 @@ class ReservoirSamplingOpExec(descString: String, idx: Int, workerCount: Int) Iterator() } - override def onFinish(port: Int): Iterator[TupleLike] = reservoir.iterator + // Only the first n slots are filled when the input is smaller than the reservoir; + // take(n) keeps the trailing unfilled (null) slots from being emitted. + override def onFinish(port: Int): Iterator[TupleLike] = reservoir.iterator.take(n) } diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExecSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExecSpec.scala index e517fc3da6..a4cb103ea7 100644 --- a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExecSpec.scala +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExecSpec.scala @@ -105,6 +105,27 @@ class ReservoirSamplingOpExecSpec extends AnyFlatSpec { assert(emitted == List(tuple(0), tuple(1), tuple(2), tuple(3))) } + it should "emit only the filled prefix, without null padding, when input size < k" in { + val exec = newExec(k = 5) + val emitted = runFinish(exec, 0 until 2) + assert(emitted == List(tuple(0), tuple(1)), "only the received tuples are emitted, in order") + assert(!emitted.contains(null), "the unfilled reservoir slots must not leak as null tuples") + } + + it should "emit nothing when the input stream is empty" in { + val exec = newExec(k = 5) + val emitted = runFinish(exec, Seq.empty) + assert(emitted.isEmpty, "an unfilled reservoir with no input emits no (null) tuples") + } + + it should "not emit null padding on a worker that receives fewer tuples than its share" in { + // k=10 over 3 workers gives worker 0 a share of 4 (equallyPartitionGoal), but skewed + // partitioning delivers it only 2 tuples; the 2 unfilled slots must not surface as nulls. + val exec = newExec(k = 10, idx = 0, workerCount = 3) + val emitted = runFinish(exec, 0 until 2) + assert(emitted == List(tuple(0), tuple(1))) + } + it should "keep exactly k tuples, all drawn from the input, when input size > k" in { val exec = newExec(k = 5) val input = 0 until 100
