This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5678-99b9ca281dd4524b0423bac9593ce1f9f136f14d in repository https://gitbox.apache.org/repos/asf/texera.git
commit 1a65a3168c5ffbd171e8ed0d64dec495e45c1b24 Author: Justin Siek <[email protected]> AuthorDate: Sat Jun 13 10:51:17 2026 -0700 fix(workflow-operator): set alreadyClosed before onClose (#5678) ### What changes were proposed in this PR? `AutoClosingIterator.hasNext` only set `alreadyClosed = true` after calling `onClose()`, and so if `onClose()` throws, `alreadyClosed` would stay false, and so a subsequent `hasNext` would reinvoke `onClose()`, running cleanup a second time on a resource whose close already failed. The change makes `alreadyClosed = true` run before `onClose()`. ### Any related issues, documentation, discussions? Closes #5660 ### How was this PR tested? Updated AutoClosingIteratorSpec — replaced the existing characterization test ("re-invoke onClose on a retry when the previous onClose threw") with a positive assertion that a second hasNext after a throwing close does NOT re-invoke onClose (closeCount stays at 1) and returns false. `sbt "WorkflowOperator/testOnly org.apache.texera.amber.operator.source.scan.AutoClosingIteratorSpec"` - 10/10 pass. `sbt scalafmtCheckAll` passes. ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Claude Opus 4.8) Co-authored-by: Justin Siek <[email protected]> --- .../operator/source/scan/AutoClosingIterator.scala | 2 +- .../source/scan/AutoClosingIteratorSpec.scala | 19 +++++++++---------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/AutoClosingIterator.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/AutoClosingIterator.scala index df6a24dbf4..15a73ce2b1 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/AutoClosingIterator.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/AutoClosingIterator.scala @@ -35,8 +35,8 @@ class AutoClosingIterator[T](iter: Iterator[T], onClose: () => Unit) extends Ite override def hasNext: Boolean = { val hn = iter.hasNext if (!hn && !alreadyClosed) { - onClose() alreadyClosed = true + onClose() } hn } diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/AutoClosingIteratorSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/AutoClosingIteratorSpec.scala index 1bd1dc5ed3..784daeabcc 100644 --- a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/AutoClosingIteratorSpec.scala +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/AutoClosingIteratorSpec.scala @@ -132,13 +132,11 @@ class AutoClosingIteratorSpec extends AnyFlatSpec { } it should - "re-invoke onClose on a retry when the previous onClose threw (alreadyClosed is set AFTER onClose)" in { - // Reading the production code: `alreadyClosed = true` runs AFTER - // `onClose()`. So if onClose throws, alreadyClosed stays false and - // a second hasNext will re-invoke onClose. This is the OPPOSITE of - // an "alreadyClosed once close was attempted" contract — characterize - // the current (brittle) behavior so a refactor that swaps the order - // (running `alreadyClosed = true` BEFORE `onClose()`) surfaces here. + "NOT re-invoke onClose on a retry when the previous onClose threw (alreadyClosed is set BEFORE onClose)" in { + // `alreadyClosed = true` runs BEFORE `onClose()`, so a throwing close + // is marked done before it can fail. The first hasNext still propagates + // the exception, but a second hasNext must NOT re-invoke onClose — it + // returns false with no further side effect. var closeCount = 0 val it = new AutoClosingIterator[Int]( Iterator.empty, @@ -147,8 +145,9 @@ class AutoClosingIteratorSpec extends AnyFlatSpec { throw new RuntimeException("boom") } ) - intercept[RuntimeException] { it.hasNext } - intercept[RuntimeException] { it.hasNext } // current impl re-runs onClose - assert(closeCount == 2, s"current impl re-fires onClose on retry; got $closeCount") + intercept[RuntimeException] { it.hasNext } // first (and only) close attempt + assert(closeCount == 1, s"onClose must fire once, got $closeCount") + assert(!it.hasNext, "retry after a throwing close returns false without re-closing") + assert(closeCount == 1, s"onClose must not re-fire on retry; got $closeCount") } }
