This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch release/v1.2
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/release/v1.2 by this push:
     new 045328e08b test(workflow-operator): add ReservoirSamplingOpExec spec 
[release/v1.2 backport] (#5642)
045328e08b is described below

commit 045328e08bc50031ffbe5ad57437b0b0b9757c26
Author: Xuan Gu <[email protected]>
AuthorDate: Fri Jun 12 12:09:37 2026 -0700

    test(workflow-operator): add ReservoirSamplingOpExec spec [release/v1.2 
backport] (#5642)
---
 .../ReservoirSamplingOpExecSpec.scala              | 181 +++++++++++++++++++++
 1 file changed, 181 insertions(+)

diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExecSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExecSpec.scala
new file mode 100644
index 0000000000..e517fc3da6
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExecSpec.scala
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.reservoirsampling
+
+import com.fasterxml.jackson.databind.node.ObjectNode
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, 
Tuple}
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+
+class ReservoirSamplingOpExecSpec extends AnyFlatSpec {
+
+  private val schema: Schema =
+    Schema().add(new Attribute("v", AttributeType.INTEGER))
+
+  private def tuple(v: Int): Tuple =
+    Tuple.builder(schema).add(new Attribute("v", AttributeType.INTEGER), 
Integer.valueOf(v)).build()
+
+  // A wider schema mixing every primitive attribute type, to prove sampling 
treats
+  // tuples opaquely and preserves all fields regardless of arity or type.
+  private val complexSchema: Schema =
+    Schema()
+      .add(new Attribute("id", AttributeType.INTEGER))
+      .add(new Attribute("label", AttributeType.STRING))
+      .add(new Attribute("score", AttributeType.DOUBLE))
+      .add(new Attribute("flag", AttributeType.BOOLEAN))
+      .add(new Attribute("big", AttributeType.LONG))
+
+  private def complexTuple(i: Int): Tuple =
+    Tuple
+      .builder(complexSchema)
+      .add(new Attribute("id", AttributeType.INTEGER), Integer.valueOf(i))
+      .add(new Attribute("label", AttributeType.STRING), s"row-$i")
+      .add(new Attribute("score", AttributeType.DOUBLE), 
java.lang.Double.valueOf(i * 1.5))
+      .add(new Attribute("flag", AttributeType.BOOLEAN), 
java.lang.Boolean.valueOf(i % 2 == 0))
+      .add(new Attribute("big", AttributeType.LONG), 
java.lang.Long.valueOf(i.toLong))
+      .build()
+
+  // LogicalOp is registered for polymorphic Jackson deserialization via the
+  // `operatorType` discriminator, so a hand-rolled `{"k":N}` string would fail
+  // to bind. Serialize a real `ReservoirSamplingOpDesc` to embed the 
discriminator.
+  private def desc(k: Int): String = {
+    val d = new ReservoirSamplingOpDesc()
+    d.k = k
+    objectMapper.writeValueAsString(d)
+  }
+
+  // `k` is renamed by @JsonProperty, so resolve the JSON key from the 
annotation
+  // rather than hard-coding it, then overwrite that slot with null on a real 
desc.
+  private def descWithNullK: String = {
+    val node = objectMapper.valueToTree[ObjectNode](new 
ReservoirSamplingOpDesc())
+    val keyForK =
+      classOf[ReservoirSamplingOpDesc]
+        .getDeclaredField("k")
+        .getAnnotation(classOf[com.fasterxml.jackson.annotation.JsonProperty])
+        .value()
+    node.putNull(keyForK)
+    objectMapper.writeValueAsString(node)
+  }
+
+  private def newExec(k: Int, idx: Int = 0, workerCount: Int = 1): 
ReservoirSamplingOpExec = {
+    val exec = new ReservoirSamplingOpExec(desc(k), idx, workerCount)
+    exec.open()
+    exec
+  }
+
+  /** Feed every value through processTuple, then drain onFinish into a list. 
*/
+  private def runFinish(exec: ReservoirSamplingOpExec, values: Seq[Int]): 
List[Tuple] =
+    runFinishTuples(exec, values.map(tuple))
+
+  /** Feed pre-built tuples through processTuple, then drain onFinish into a 
list. */
+  private def runFinishTuples(exec: ReservoirSamplingOpExec, tuples: 
Seq[Tuple]): List[Tuple] = {
+    tuples.foreach(t => exec.processTuple(t, 0))
+    exec.onFinish(0).map(_.asInstanceOf[Tuple]).toList
+  }
+
+  "ReservoirSamplingOpExec.processTuple" should "buffer silently and emit 
nothing until onFinish" in {
+    val exec = newExec(k = 3)
+    val perTupleEmissions = (0 until 10).map(i => exec.processTuple(tuple(i), 
0).toList)
+    assert(
+      perTupleEmissions.forall(_.isEmpty),
+      "processTuple should never emit; sampling emits on finish"
+    )
+  }
+
+  "ReservoirSamplingOpExec.onFinish" should "return all input tuples in order 
when input size == k" in {
+    val exec = newExec(k = 4)
+    val emitted = runFinish(exec, 0 until 4)
+    assert(emitted == List(tuple(0), tuple(1), tuple(2), tuple(3)))
+  }
+
+  it should "keep exactly k tuples, all drawn from the input, when input size 
> k" in {
+    val exec = newExec(k = 5)
+    val input = 0 until 100
+    val emitted = runFinish(exec, input)
+
+    assert(emitted.size == 5, "reservoir must hold exactly k samples")
+    assert(!emitted.contains(null), "no null padding when the reservoir is 
fully filled")
+    val inputTuples = input.map(tuple).toSet
+    assert(
+      emitted.forall(inputTuples.contains),
+      "every sample must originate from the input stream"
+    )
+    assert(emitted.distinct.size == emitted.size, "each input tuple is sampled 
at most once")
+  }
+
+  it should "be deterministic across runs (RNG is seeded, so identical input 
yields identical samples)" in {
+    val input = 0 until 100
+    val firstRun = runFinish(newExec(k = 7), input)
+    val secondRun = runFinish(newExec(k = 7), input)
+    assert(firstRun == secondRun)
+    // Sanity-check the sample is not simply the first k tuples, i.e. 
replacement happened.
+    assert(firstRun != (0 until 7).map(tuple).toList)
+  }
+
+  it should "distribute k across workers via equallyPartitionGoal (k=10, 3 
workers -> 4,3,3)" in {
+    // The remainder is handed to the lowest-indexed workers, so worker 0 
keeps one extra.
+    val perWorkerSize = (0 until 3).map { idx =>
+      runFinish(newExec(k = 10, idx = idx, workerCount = 3), 0 until 50).size
+    }
+    assert(perWorkerSize == Seq(4, 3, 3))
+    assert(perWorkerSize.sum == 10, "the per-worker reservoirs together hold 
the requested k")
+  }
+
+  "ReservoirSamplingOpExec.open" should "reset state so a reused executor 
re-samples from scratch" in {
+    val exec = newExec(k = 3)
+    runFinish(exec, 0 until 20) // first pass consumes the executor's state
+    exec.open() // reopen should clear n and the reservoir
+    val emitted = runFinish(exec, Seq(100, 101, 102))
+    assert(emitted == List(tuple(100), tuple(101), tuple(102)))
+  }
+
+  it should "preserve every field of multi-attribute tuples drawn from the 
input (complex schema)" in {
+    val exec = newExec(k = 5)
+    val input = (0 until 100).map(complexTuple)
+    val emitted = runFinishTuples(exec, input)
+
+    assert(emitted.size == 5, "reservoir must hold exactly k samples")
+    assert(!emitted.contains(null), "no null padding when the reservoir is 
fully filled")
+    val inputTuples = input.toSet
+    assert(
+      emitted.forall(inputTuples.contains),
+      "each sample is an intact input tuple with all five attributes preserved"
+    )
+    assert(emitted.distinct.size == emitted.size, "each input tuple is sampled 
at most once")
+  }
+
+  "ReservoirSamplingOpExec with a bad k" should "reject a negative k with a 
negative-sized reservoir" in {
+    // equallyPartitionGoal(-1, 1) -> count = -1, so open() allocates 
Array.ofDim(-1).
+    assertThrows[NegativeArraySizeException] {
+      newExec(k = -1)
+    }
+  }
+
+  it should "coerce a null k to 0 and reject sampling a populated stream" in {
+    // A null k deserializes to the primitive default 0, yielding a zero-length
+    // reservoir; the first replacement draw then calls Random.nextInt(0).
+    val exec = new ReservoirSamplingOpExec(descWithNullK, 0, 1)
+    exec.open()
+    assert(exec.onFinish(0).isEmpty, "an empty reservoir emits nothing")
+    assertThrows[IllegalArgumentException] {
+      exec.processTuple(tuple(0), 0)
+    }
+  }
+}

Reply via email to