This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-5657-190823f7562ba8c0bb2a515b0ce4823cf640e049
in repository https://gitbox.apache.org/repos/asf/texera.git

commit e0a96478817a5c7a2f585de1952e40f7c8ba534f
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri Jun 12 17:22:16 2026 -0700

    test(workflow-operator): add unit test coverage for AutoClosingIterator and 
UnionOpExec (#5657)
    
    ### What changes were proposed in this PR?
    
    Pin behavior of two previously-uncovered helpers in
    `common/workflow-operator`. No production-code changes.
    
    | Spec | Source class | Tests |
    | --- | --- | --- |
    | `AutoClosingIteratorSpec` | `AutoClosingIterator` | 10 |
    | `UnionOpExecSpec` | `UnionOpExec` | 7 |
    
    Both spec files follow the `<srcClassName>Spec.scala` one-to-one
    convention.
    
    **Behavior pinned — `AutoClosingIterator`**
    
    | Surface | Contract |
    | --- | --- |
    | `hasNext` (non-empty source) | `true`; `onClose` not invoked |
    | `hasNext` (exhausted source) | `false`; `onClose` invoked exactly once
    |
    | Repeated `hasNext` after exhaustion | does NOT re-fire `onClose`
    (`alreadyClosed` guard) |
    | `next()` | delegates straight to the wrapped iterator (in order) |
    | Full traversal via `toList` | yields every element; `onClose` fires
    once at the end |
    | Already-empty source | first `hasNext` returns `false` and fires
    `onClose` |
    | Mid-iteration | `onClose` stays un-fired between elements |
    | `onClose` throws | exception propagates (no swallowing) |
    | `onClose` throws + retry | current impl re-fires `onClose` (assignment
    to `alreadyClosed` runs AFTER `onClose()`); characterization pins this
    behavior |
    
    **Behavior pinned — `UnionOpExec`**
    
    | Surface | Contract |
    | --- | --- |
    | `processTuple(tuple, port = 0)` | yields a single-element iterator
    containing the tuple |
    | `processTuple` (any port) | port-agnostic — same tuple passes through
    for ports 0, 1, 5, 99, MaxValue, -1 |
    | Tuple identity | pass-through preserves the exact `Tuple` reference
    (no copy) |
    | Successive calls | each returns an independent fresh iterator (no
    shared cursor) |
    | Per-call iterator | yields exactly one element |
    | `null` tuple | passes through as `null` (the impl does not null-check)
    |
    | Type contract | `UnionOpExec` is an `OperatorExecutor` |
    
    ### Any related issues, documentation, discussions?
    
    Closes #5653.
    
    ### How was this PR tested?
    
    Pure unit-test additions; verified locally with:
    
    - `sbt "WorkflowOperator/testOnly
    org.apache.texera.amber.operator.source.scan.AutoClosingIteratorSpec
    org.apache.texera.amber.operator.union.UnionOpExecSpec"` — 17 tests, all
    green
    - `sbt scalafmtCheckAll` — clean
    - CI to confirm
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Opus 4.7 [1M context])
---
 .../source/scan/AutoClosingIteratorSpec.scala      | 154 +++++++++++++++++++++
 .../amber/operator/union/UnionOpExecSpec.scala     | 114 +++++++++++++++
 2 files changed, 268 insertions(+)

diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/AutoClosingIteratorSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/AutoClosingIteratorSpec.scala
new file mode 100644
index 0000000000..1bd1dc5ed3
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/AutoClosingIteratorSpec.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.scan
+
+import org.scalatest.flatspec.AnyFlatSpec
+
+class AutoClosingIteratorSpec extends AnyFlatSpec {
+
+  // 
---------------------------------------------------------------------------
+  // hasNext + onClose firing semantics
+  // 
---------------------------------------------------------------------------
+
+  "AutoClosingIterator.hasNext (non-empty underlying)" should
+    "return true and NOT invoke onClose" in {
+    var closed = false
+    val it = new AutoClosingIterator[Int](Iterator(1, 2, 3), () => closed = 
true)
+    assert(it.hasNext)
+    assert(!closed, "onClose must not fire while elements remain")
+  }
+
+  "AutoClosingIterator.hasNext (exhausted underlying)" should
+    "return false and invoke onClose exactly once" in {
+    var closeCount = 0
+    val it = new AutoClosingIterator[Int](Iterator.empty, () => closeCount += 
1)
+    assert(!it.hasNext)
+    assert(closeCount == 1)
+  }
+
+  it should "NOT invoke onClose again on a second hasNext after exhaustion" in 
{
+    var closeCount = 0
+    val it = new AutoClosingIterator[Int](Iterator.empty, () => closeCount += 
1)
+    assert(!it.hasNext)
+    assert(!it.hasNext)
+    assert(!it.hasNext)
+    assert(closeCount == 1, s"onClose must fire exactly once, got $closeCount 
calls")
+  }
+
+  // 
---------------------------------------------------------------------------
+  // next() — delegates straight through
+  // 
---------------------------------------------------------------------------
+
+  "AutoClosingIterator.next" should "delegate to the wrapped iterator (in 
order)" in {
+    val it = new AutoClosingIterator[Int](Iterator(10, 20, 30), () => ())
+    assert(it.next() == 10)
+    assert(it.next() == 20)
+    assert(it.next() == 30)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Full traversal — onClose fires exactly once at the end
+  // 
---------------------------------------------------------------------------
+
+  "AutoClosingIterator full traversal" should
+    "yield every element of the wrapped iterator in order" in {
+    val it = new AutoClosingIterator[Int](Iterator(1, 2, 3, 4, 5), () => ())
+    assert(it.toList == List(1, 2, 3, 4, 5))
+  }
+
+  it should "fire onClose exactly once when toList finishes consuming" in {
+    var closeCount = 0
+    val it = new AutoClosingIterator[Int](Iterator(1, 2, 3), () => closeCount 
+= 1)
+    val _ = it.toList
+    assert(closeCount == 1, s"expected single onClose firing, got $closeCount")
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Already-empty source — close fires on the very first hasNext call
+  // 
---------------------------------------------------------------------------
+
+  "AutoClosingIterator over an already-empty source" should
+    "fire onClose on the very first hasNext call" in {
+    var fired = false
+    val it = new AutoClosingIterator[Int](Iterator.empty, () => fired = true)
+    val _ = it.hasNext
+    assert(fired, "onClose must fire when the source is already empty")
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Mid-iteration close behavior — onClose does NOT fire before exhaustion
+  // 
---------------------------------------------------------------------------
+
+  "AutoClosingIterator (mid-iteration)" should
+    "leave onClose un-fired between elements (only fires after hasNext returns 
false)" in {
+    var fired = false
+    val it = new AutoClosingIterator[Int](Iterator(1, 2, 3), () => fired = 
true)
+    // Step-by-step assertion: after each hasNext that returns TRUE,
+    // onClose MUST still be un-fired. Only the hasNext that returns
+    // false may flip `fired`. A bug that prematurely closed during a
+    // truthy hasNext would surface here, not just at the loop's exit.
+    assert(it.hasNext); assert(!fired, "onClose must not fire while element 1 
is reachable")
+    assert(it.next() == 1)
+    assert(it.hasNext); assert(!fired, "onClose must not fire while element 2 
is reachable")
+    assert(it.next() == 2)
+    assert(it.hasNext); assert(!fired, "onClose must not fire while element 3 
is reachable")
+    assert(it.next() == 3)
+    // The final hasNext returns false — THIS is the call that fires onClose.
+    assert(!it.hasNext)
+    assert(fired, "after hasNext first returns false, onClose must have fired")
+  }
+
+  // 
---------------------------------------------------------------------------
+  // onClose exception propagation
+  // 
---------------------------------------------------------------------------
+
+  "AutoClosingIterator" should
+    "propagate exceptions thrown from onClose (no swallowing)" in {
+    val it = new AutoClosingIterator[Int](
+      Iterator.empty,
+      () => throw new IllegalStateException("close failed")
+    )
+    val ex = intercept[IllegalStateException] {
+      it.hasNext
+    }
+    assert(ex.getMessage == "close failed")
+  }
+
+  it should
+    "re-invoke onClose on a retry when the previous onClose threw 
(alreadyClosed is set AFTER onClose)" in {
+    // Reading the production code: `alreadyClosed = true` runs AFTER
+    // `onClose()`. So if onClose throws, alreadyClosed stays false and
+    // a second hasNext will re-invoke onClose. This is the OPPOSITE of
+    // an "alreadyClosed once close was attempted" contract — characterize
+    // the current (brittle) behavior so a refactor that swaps the order
+    // (running `alreadyClosed = true` BEFORE `onClose()`) surfaces here.
+    var closeCount = 0
+    val it = new AutoClosingIterator[Int](
+      Iterator.empty,
+      () => {
+        closeCount += 1
+        throw new RuntimeException("boom")
+      }
+    )
+    intercept[RuntimeException] { it.hasNext }
+    intercept[RuntimeException] { it.hasNext } // current impl re-runs onClose
+    assert(closeCount == 2, s"current impl re-fires onClose on retry; got 
$closeCount")
+  }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/union/UnionOpExecSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/union/UnionOpExecSpec.scala
new file mode 100644
index 0000000000..59a4db213b
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/union/UnionOpExecSpec.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.union
+
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, 
Tuple}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class UnionOpExecSpec extends AnyFlatSpec {
+
+  // 
---------------------------------------------------------------------------
+  // Fixture builders
+  // 
---------------------------------------------------------------------------
+
+  private val attr = new Attribute("v", AttributeType.INTEGER)
+  private val schema: Schema = Schema().add(attr)
+  private def tuple(v: Int): Tuple =
+    Tuple.builder(schema).add(attr, Integer.valueOf(v)).build()
+
+  // 
---------------------------------------------------------------------------
+  // Pass-through semantics
+  // 
---------------------------------------------------------------------------
+
+  "UnionOpExec.processTuple" should
+    "yield a single-element iterator containing the input tuple" in {
+    val exec = new UnionOpExec
+    val t = tuple(42)
+    val out = exec.processTuple(t, port = 0).toList
+    assert(out == List(t))
+  }
+
+  it should "preserve the exact Tuple instance (pass-through, no copy)" in {
+    val exec = new UnionOpExec
+    val t = tuple(7)
+    val out = exec.processTuple(t, port = 0).toList
+    assert(out.size == 1)
+    assert(out.head eq t, "pass-through must return the same Tuple reference")
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Port-agnostic behavior — union merges streams regardless of port id
+  // 
---------------------------------------------------------------------------
+
+  it should "yield the same tuple regardless of which input port it arrived 
on" in {
+    val exec = new UnionOpExec
+    val t = tuple(1)
+    val portsTested = List(0, 1, 5, 99, Int.MaxValue, -1)
+    portsTested.foreach { p =>
+      assert(exec.processTuple(t, port = p).toList == List(t), s"port=$p must 
pass through")
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Repeated calls — no state leakage
+  // 
---------------------------------------------------------------------------
+
+  it should "return an independent fresh iterator on each call (no shared 
cursor)" in {
+    val exec = new UnionOpExec
+    val a = tuple(1)
+    val b = tuple(2)
+    val itA = exec.processTuple(a, port = 0)
+    val itB = exec.processTuple(b, port = 1)
+    // Consume a before b — neither call should affect the other.
+    assert(itA.toList == List(a))
+    assert(itB.toList == List(b))
+  }
+
+  it should "produce exactly one element per processTuple call" in {
+    val exec = new UnionOpExec
+    val t = tuple(1)
+    val iter = exec.processTuple(t, port = 0)
+    assert(iter.hasNext)
+    iter.next()
+    assert(!iter.hasNext, "iterator must be exhausted after the single 
pass-through")
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Null tuple — pass-through is unconditional
+  // 
---------------------------------------------------------------------------
+
+  it should "pass-through a null tuple unchanged (the impl does not 
null-check)" in {
+    // Pin current behavior: `Iterator(tuple)` with `tuple = null` yields
+    // an iterator containing `null`. If a future change adds a null-
+    // check, that's a behavior change worth catching.
+    val exec = new UnionOpExec
+    val out = exec.processTuple(null, port = 0).toList
+    assert(out == List(null))
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Type contract — UnionOpExec is an OperatorExecutor
+  // 
---------------------------------------------------------------------------
+
+  "UnionOpExec" should "be an OperatorExecutor (compile-time enforced)" in {
+    val exec: org.apache.texera.amber.core.executor.OperatorExecutor = new 
UnionOpExec
+    assert(exec.processTuple(tuple(1), port = 0).toList.size == 1)
+  }
+}

Reply via email to