This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 75b46197d3 test(amber): add unit test coverage for record-storage 
cluster (#5447)
75b46197d3 is described below

commit 75b46197d3584e0f211d5d3211bef571f5c61e9c
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Jun 7 16:32:10 2026 -0700

    test(amber): add unit test coverage for record-storage cluster (#5447)
    
    ### What changes were proposed in this PR?
    
    Pin behavior of three previously-uncovered modules in
    `engine/common/storage` that sit on the checkpoint / fault-tolerance hot
    path via `SequentialRecordStorage.getStorage(...)`. No production-code
    changes.
    
    | Spec | Source class | Tests |
    | --- | --- | --- |
    | `EmptyRecordStorageSpec` | `EmptyRecordStorage` | 11 |
    | `VFSRecordStorageSpec` | `VFSRecordStorage` | 9 |
    | `SequentialRecordStorageSpec` | `SequentialRecordStorage` (abstract +
    companion) | 9 |
    
    All three spec files follow the `<srcClassName>Spec.scala` one-to-one
    convention.
    
    **Behavior pinned**
    
    | Surface | Contract |
    | --- | --- |
    | `SequentialRecordStorage.getStorage(None)` | dispatches to
    `EmptyRecordStorage` |
    | `SequentialRecordStorage.getStorage(Some(file://…))` | dispatches to
    `VFSRecordStorage` and the returned instance round-trips a record |
    | `SequentialRecordWriter` / `SequentialRecordReader` | round-trip a
    sequence of records through the size-prefixed binary frame; the reader's
    `inputStreamGen` thunk supports re-reading the same byte stream |
    | `SequentialRecordStorage.fetchAllRecords` | yields the underlying
    iterator's records in order (and `Iterable.empty` when nothing was
    written) |
    | `VFSRecordStorage` constructor | auto-creates the target folder;
    leaves an existing folder + contents intact |
    | `VFSRecordStorage.getWriter` / `getReader` | round-trip records
    through a local `file://` URI; produce empty iterator when the file has
    no records; multiple files under the same folder do not cross-pollinate
    |
    | `VFSRecordStorage.deleteStorage` | removes the on-disk folder created
    by the constructor |
    | `VFSRecordStorage.containsFolder` | distinguishes existing folder vs.
    existing file vs. missing entry |
    | `EmptyRecordStorage.containsFolder` | always returns `false`
    regardless of folder name |
    | `EmptyRecordStorage.deleteStorage` | is a safe no-op (idempotent) |
    | `EmptyRecordStorage.getReader` | yields zero records for any fileName;
    successive `getReader` calls produce independent iterators |
    | `EmptyRecordStorage.getWriter` | returns a writer whose `flush()` /
    `close()` work without `writeRecord` having been called; a second writer
    is unaffected by closing the first |
    
    **Notes**
    
    - The `hdfs://` dispatch branch of `getStorage` is deliberately left out
    — `HDFSRecordStorage`'s constructor calls `FileSystem.get`, which can
    block on DNS / network and is unit-test-hostile. The branch is a single
    line and any regression there would surface immediately in higher-level
    checkpoint / fault-tolerance suites that exercise `hdfs://` URIs.
    - The serde-touching paths (`SequentialRecordWriter.writeRecord` /
    `SequentialRecordReader`'s iterator) hard-code `AmberRuntime.serde`. The
    two specs that exercise this path (`VFSRecordStorageSpec`,
    `SequentialRecordStorageSpec`) own a suite-local `ActorSystem` and
    inject it into `AmberRuntime` via reflection, tearing it down in
    `afterAll` — same pattern as `CheckpointSubsystemSpec` /
    `ClientEventSpec`. `EmptyRecordStorageSpec` deliberately avoids
    `writeRecord` so it does not need the harness.
    
    ### Any related issues, documentation, discussions?
    
    Closes #5446.
    
    ### How was this PR tested?
    
    Pure unit-test additions; verified locally with:
    
    - `sbt "WorkflowExecutionService/testOnly
    org.apache.texera.amber.engine.common.storage.EmptyRecordStorageSpec
    org.apache.texera.amber.engine.common.storage.SequentialRecordStorageSpec
    org.apache.texera.amber.engine.common.storage.VFSRecordStorageSpec"` —
    29 tests, all green
    - `sbt scalafmtCheckAll` — clean
    - CI to confirm
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Sonnet 4.5)
---
 .../common/storage/EmptyRecordStorageSpec.scala    | 155 +++++++++++++
 .../storage/SequentialRecordStorageSpec.scala      | 244 +++++++++++++++++++
 .../common/storage/VFSRecordStorageSpec.scala      | 258 +++++++++++++++++++++
 3 files changed, 657 insertions(+)

diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/EmptyRecordStorageSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/EmptyRecordStorageSpec.scala
new file mode 100644
index 0000000000..e3e73fe235
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/EmptyRecordStorageSpec.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.common.storage
+
+import org.scalatest.flatspec.AnyFlatSpec
+
+class EmptyRecordStorageSpec extends AnyFlatSpec {
+
+  // EmptyRecordStorage is the null-object branch of
+  // SequentialRecordStorage.getStorage(None). Its reader is backed by a
+  // 0-byte NullInputStream and its writer by NullOutputStream — so the
+  // reader yields zero records WITHOUT touching AmberRuntime.serde
+  // (the iterator's internalNext catches the EOF from readInt() and
+  // returns null before deserialize would run).
+  //
+  // The writer DOES call AmberRuntime.serde.serialize on writeRecord —
+  // but every test in this suite avoids writeRecord, exercising only
+  // the no-AmberRuntime surface (constructor / flush / close / reader
+  // hasNext). This keeps the spec free of the suite-local ActorSystem
+  // setup that ClientEventSpec / CheckpointSubsystemSpec need; the
+  // serde-touching write path is pinned in SequentialRecordStorageSpec
+  // where the harness is set up for it.
+
+  // 
---------------------------------------------------------------------------
+  // containsFolder
+  // 
---------------------------------------------------------------------------
+
+  "EmptyRecordStorage.containsFolder" should "return false for any folder 
name" in {
+    val storage = new EmptyRecordStorage[String]()
+    assert(!storage.containsFolder("anything"))
+    assert(!storage.containsFolder(""))
+    assert(!storage.containsFolder("a/b/c"))
+  }
+
+  // 
---------------------------------------------------------------------------
+  // deleteStorage
+  // 
---------------------------------------------------------------------------
+
+  "EmptyRecordStorage.deleteStorage" should "be a safe no-op" in {
+    val storage = new EmptyRecordStorage[String]()
+    storage.deleteStorage() // must not throw
+    // Idempotent — second call also no-ops.
+    storage.deleteStorage()
+    succeed
+  }
+
+  // 
---------------------------------------------------------------------------
+  // getReader — zero-record iterator (no serde dependency)
+  // 
---------------------------------------------------------------------------
+
+  "EmptyRecordStorage.getReader" should
+    "return a non-null reader whose iterator is empty (hasNext == false)" in {
+    val storage = new EmptyRecordStorage[String]()
+    val reader = storage.getReader("any-file")
+    assert(reader != null)
+    val iter = reader.mkRecordIterator()
+    assert(!iter.hasNext, "expected an empty iterator from a 
NullInputStream-backed reader")
+  }
+
+  it should "yield an empty list on toList" in {
+    val storage = new EmptyRecordStorage[String]()
+    val records = storage.getReader("any-file").mkRecordIterator().toList
+    assert(records.isEmpty)
+  }
+
+  it should "produce independent empty iterators across successive getReader 
calls" in {
+    // Behavior we want pinned: exhausting one reader does not leak state
+    // into a second reader returned by a later getReader call. Independent
+    // of whether the two readers happen to be the same instance —
+    // mkRecordIterator() on each must produce its own empty iterator.
+    val storage = new EmptyRecordStorage[String]()
+    val r1 = storage.getReader("a")
+    val r2 = storage.getReader("a")
+    val _ = r1.mkRecordIterator().toList
+    assert(r2.mkRecordIterator().toList.isEmpty)
+  }
+
+  it should "ignore the fileName argument (every name produces the same empty 
iterator)" in {
+    // The contract is "any read against an EmptyRecordStorage produces no
+    // records" — regardless of fileName.
+    val storage = new EmptyRecordStorage[String]()
+    assert(storage.getReader("alpha").mkRecordIterator().toList.isEmpty)
+    assert(storage.getReader("beta").mkRecordIterator().toList.isEmpty)
+    assert(storage.getReader("").mkRecordIterator().toList.isEmpty)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // getWriter — construction & lifecycle without writeRecord
+  // 
---------------------------------------------------------------------------
+
+  "EmptyRecordStorage.getWriter" should "return a non-null writer" in {
+    val storage = new EmptyRecordStorage[String]()
+    val writer = storage.getWriter("any-file")
+    assert(writer != null)
+  }
+
+  it should "allow flush() before any writeRecord without throwing" in {
+    val storage = new EmptyRecordStorage[String]()
+    val writer = storage.getWriter("any-file")
+    writer.flush()
+    succeed
+  }
+
+  it should "allow close() before any writeRecord without throwing" in {
+    val storage = new EmptyRecordStorage[String]()
+    val writer = storage.getWriter("any-file")
+    writer.close()
+    succeed
+  }
+
+  it should "keep a second writer usable after the first is closed" in {
+    // Behavior we want pinned: closing one writer does not invalidate a
+    // second writer returned by a later getWriter call. The two writers
+    // must independently support flush/close without interfering.
+    val storage = new EmptyRecordStorage[String]()
+    val w1 = storage.getWriter("a")
+    val w2 = storage.getWriter("a")
+    w1.close()
+    w2.flush()
+    w2.close()
+    succeed
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Type parameter erasure — different T must still produce a working
+  // storage object. EmptyRecordStorage's behavior is independent of T;
+  // pin that with a non-String T to catch any accidental ClassTag misuse.
+  // 
---------------------------------------------------------------------------
+
+  "EmptyRecordStorage[T]" should "construct cleanly for a different T 
(java.lang.Integer)" in {
+    val storage = new EmptyRecordStorage[java.lang.Integer]()
+    assert(!storage.containsFolder("anything"))
+    assert(storage.getReader("x").mkRecordIterator().toList.isEmpty)
+    storage.getWriter("x").close()
+    storage.deleteStorage()
+    succeed
+  }
+}
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/SequentialRecordStorageSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/SequentialRecordStorageSpec.scala
new file mode 100644
index 0000000000..ce4e2fba48
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/SequentialRecordStorageSpec.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.common.storage
+
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.serialization.{Serialization, SerializationExtension}
+import org.apache.pekko.testkit.TestKit
+import org.apache.texera.amber.engine.common.AmberRuntime
+import org.apache.texera.amber.engine.common.storage.SequentialRecordStorage.{
+  SequentialRecordReader,
+  SequentialRecordWriter
+}
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.flatspec.AnyFlatSpec
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, 
DataOutputStream}
+import java.nio.file.{Files, Path}
+
+class SequentialRecordStorageSpec extends AnyFlatSpec with BeforeAndAfterAll {
+
+  // 
---------------------------------------------------------------------------
+  // Suite-local Pekko serde injected into AmberRuntime via reflection
+  // 
---------------------------------------------------------------------------
+  //
+  // `SequentialRecordWriter.writeRecord` / `SequentialRecordReader`'s
+  // iterator both hard-code `AmberRuntime.serde`. Pattern matches
+  // CheckpointSubsystemSpec / ClientEventSpec: own a suite-local
+  // ActorSystem, inject it into AmberRuntime's private vars via
+  // reflection, tear down in afterAll.
+
+  private val testSystem: ActorSystem =
+    ActorSystem("SequentialRecordStorageSpec-test", AmberRuntime.pekkoConfig)
+  private val testSerde: Serialization = SerializationExtension(testSystem)
+
+  private def setAmberRuntimeField(name: String, value: AnyRef): Unit = {
+    val field = AmberRuntime.getClass.getDeclaredField(name)
+    field.setAccessible(true)
+    field.set(AmberRuntime, value)
+  }
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    setAmberRuntimeField("_actorSystem", testSystem)
+    setAmberRuntimeField("_serde", testSerde)
+  }
+
+  override protected def afterAll(): Unit = {
+    setAmberRuntimeField("_serde", null)
+    setAmberRuntimeField("_actorSystem", null)
+    TestKit.shutdownActorSystem(testSystem)
+    super.afterAll()
+  }
+
+  // `Files.walk` returns a closeable Stream backed by an open directory
+  // handle — wrap in try/finally so the handle is released even if
+  // traversal throws, otherwise temp-dir deletion can flake on Windows.
+  private def deleteRecursively(p: Path): Unit = {
+    if (!Files.exists(p)) return
+    val stream = Files.walk(p)
+    try {
+      stream
+        .sorted(java.util.Comparator.reverseOrder())
+        .forEach(child => Files.deleteIfExists(child))
+    } finally {
+      stream.close()
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // SequentialRecordWriter + SequentialRecordReader — in-memory round-trip
+  // 
---------------------------------------------------------------------------
+  //
+  // Pin the size-prefixed framing contract directly, using
+  // ByteArrayInput/OutputStream so the test does not depend on any
+  // concrete SequentialRecordStorage subclass (the cross-cutting Reader/
+  // Writer pair is what we're characterizing here).
+
+  "SequentialRecordWriter + SequentialRecordReader" should
+    "round-trip a sequence of records through a size-prefixed binary frame" in 
{
+    val baos = new ByteArrayOutputStream()
+    val writer = new SequentialRecordWriter[String](new DataOutputStream(baos))
+    writer.writeRecord("alpha")
+    writer.writeRecord("beta")
+    writer.writeRecord("gamma")
+    writer.flush()
+    writer.close()
+
+    val reader = new SequentialRecordReader[String](() =>
+      new DataInputStream(new ByteArrayInputStream(baos.toByteArray))
+    )
+    assert(reader.mkRecordIterator().toList == List("alpha", "beta", "gamma"))
+  }
+
+  it should "round-trip an empty stream as a zero-element iterator" in {
+    val baos = new ByteArrayOutputStream()
+    val writer = new SequentialRecordWriter[String](new DataOutputStream(baos))
+    writer.flush()
+    writer.close()
+    val reader = new SequentialRecordReader[String](() =>
+      new DataInputStream(new ByteArrayInputStream(baos.toByteArray))
+    )
+    assert(reader.mkRecordIterator().toList.isEmpty)
+  }
+
+  it should "round-trip a single record" in {
+    // The size-prefixed format has the same shape for 1 element as for
+    // many, but pinning the 1-record case independently catches an
+    // off-by-one in the iterator's prefetch logic.
+    val baos = new ByteArrayOutputStream()
+    val writer = new SequentialRecordWriter[String](new DataOutputStream(baos))
+    writer.writeRecord("only")
+    writer.flush()
+    writer.close()
+    val reader = new SequentialRecordReader[String](() =>
+      new DataInputStream(new ByteArrayInputStream(baos.toByteArray))
+    )
+    val iter = reader.mkRecordIterator()
+    assert(iter.hasNext)
+    assert(iter.next() == "only")
+    assert(!iter.hasNext, "iterator must report exhaustion after the only 
element is consumed")
+  }
+
+  it should "support reading the same byte stream more than once via the 
inputStreamGen thunk" in {
+    // The reader takes a `() => DataInputStream` so it can be re-opened.
+    // Two independent calls to mkRecordIterator must each consume their
+    // own DataInputStream (constructed by the thunk) and produce the
+    // same sequence — proving the thunk is invoked per iterator and that
+    // the reader holds no shared mutable input state.
+    val baos = new ByteArrayOutputStream()
+    val writer = new SequentialRecordWriter[String](new DataOutputStream(baos))
+    writer.writeRecord("a")
+    writer.writeRecord("b")
+    writer.flush()
+    writer.close()
+    val payload = baos.toByteArray
+    val reader = new SequentialRecordReader[String](() =>
+      new DataInputStream(new ByteArrayInputStream(payload))
+    )
+    assert(reader.mkRecordIterator().toList == List("a", "b"))
+    assert(reader.mkRecordIterator().toList == List("a", "b"))
+  }
+
+  // 
---------------------------------------------------------------------------
+  // SequentialRecordStorage.fetchAllRecords — companion-level helper
+  // 
---------------------------------------------------------------------------
+
+  "SequentialRecordStorage.fetchAllRecords" should
+    "return every record written to the underlying storage in order" in {
+    val tmp = Files.createTempDirectory("seq-storage-fetch-all-")
+    val sub = tmp.resolve("logs")
+    try {
+      val storage = new VFSRecordStorage[String](sub.toUri)
+      val writer = storage.getWriter("file-1")
+      writer.writeRecord("r1")
+      writer.writeRecord("r2")
+      writer.writeRecord("r3")
+      writer.flush(); writer.close()
+
+      val all = SequentialRecordStorage.fetchAllRecords(storage, 
"file-1").toList
+      assert(all == List("r1", "r2", "r3"))
+    } finally {
+      deleteRecursively(tmp)
+    }
+  }
+
+  it should "return an empty Iterable when the underlying reader has no 
records" in {
+    val tmp = Files.createTempDirectory("seq-storage-fetch-empty-")
+    val sub = tmp.resolve("logs")
+    try {
+      val storage = new VFSRecordStorage[String](sub.toUri)
+      val writer = storage.getWriter("empty")
+      writer.flush(); writer.close()
+      assert(SequentialRecordStorage.fetchAllRecords(storage, 
"empty").toList.isEmpty)
+    } finally {
+      deleteRecursively(tmp)
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // SequentialRecordStorage.getStorage — factory dispatch
+  // 
---------------------------------------------------------------------------
+  //
+  // The factory dispatches on the URI scheme. We pin the two
+  // schemes that can be exercised without external infrastructure
+  // (None → Empty, file:// → VFS). The hdfs:// branch is unit-test-
+  // hostile (HDFSRecordStorage's constructor calls FileSystem.get,
+  // which can block on DNS / network), so it is deliberately left
+  // out of this characterization — the factory's
+  // `if (scheme.toLowerCase == "hdfs")` is a single line and any
+  // regression there would surface immediately in higher-level
+  // checkpoint / fault-tolerance suites that use hdfs:// URIs.
+
+  "SequentialRecordStorage.getStorage" should
+    "return an EmptyRecordStorage when the location is None" in {
+    val storage = SequentialRecordStorage.getStorage[String](None)
+    assert(storage.isInstanceOf[EmptyRecordStorage[_]])
+  }
+
+  it should "return a VFSRecordStorage for a file:// URI" in {
+    val tmp = Files.createTempDirectory("seq-storage-factory-vfs-")
+    val sub = tmp.resolve("logs")
+    try {
+      val storage = SequentialRecordStorage.getStorage[String](Some(sub.toUri))
+      assert(storage.isInstanceOf[VFSRecordStorage[_]])
+      // Round-trip a record through the dispatched VFSRecordStorage to
+      // prove the factory actually returned a usable instance (not a
+      // half-initialized one).
+      val w = storage.getWriter("data")
+      w.writeRecord("payload")
+      w.flush(); w.close()
+      val r = storage.getReader("data").mkRecordIterator().toList
+      assert(r == List("payload"))
+    } finally {
+      deleteRecursively(tmp)
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Cross-check — fetchAllRecords composes with the factory-produced storage
+  // 
---------------------------------------------------------------------------
+
+  "fetchAllRecords on a factory-produced EmptyRecordStorage" should
+    "yield zero records" in {
+    val storage = SequentialRecordStorage.getStorage[String](None)
+    assert(SequentialRecordStorage.fetchAllRecords(storage, 
"any").toList.isEmpty)
+  }
+}
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/VFSRecordStorageSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/VFSRecordStorageSpec.scala
new file mode 100644
index 0000000000..258fd268ad
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/VFSRecordStorageSpec.scala
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.common.storage
+
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.serialization.{Serialization, SerializationExtension}
+import org.apache.pekko.testkit.TestKit
+import org.apache.texera.amber.engine.common.AmberRuntime
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.flatspec.AnyFlatSpec
+
+import java.net.URI
+import java.nio.file.{Files, Path}
+
+class VFSRecordStorageSpec extends AnyFlatSpec with BeforeAndAfterAll {
+
+  // 
---------------------------------------------------------------------------
+  // Suite-local Pekko serde injected into AmberRuntime via reflection
+  // 
---------------------------------------------------------------------------
+  //
+  // `SequentialRecordWriter.writeRecord` hard-codes `AmberRuntime.serde`,
+  // so any test that round-trips a record needs AmberRuntime initialized.
+  // Pattern matches CheckpointSubsystemSpec / ClientEventSpec — own a
+  // suite-local ActorSystem, inject it into AmberRuntime's private vars,
+  // tear down in afterAll.
+
+  private val testSystem: ActorSystem =
+    ActorSystem("VFSRecordStorageSpec-test", AmberRuntime.pekkoConfig)
+  private val testSerde: Serialization = SerializationExtension(testSystem)
+
+  private def setAmberRuntimeField(name: String, value: AnyRef): Unit = {
+    val field = AmberRuntime.getClass.getDeclaredField(name)
+    field.setAccessible(true)
+    field.set(AmberRuntime, value)
+  }
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    setAmberRuntimeField("_actorSystem", testSystem)
+    setAmberRuntimeField("_serde", testSerde)
+  }
+
+  override protected def afterAll(): Unit = {
+    setAmberRuntimeField("_serde", null)
+    setAmberRuntimeField("_actorSystem", null)
+    TestKit.shutdownActorSystem(testSystem)
+    super.afterAll()
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Temp-directory helpers — every test owns its own scratch folder so the
+  // cases are independent and parallel-safe.
+  // 
---------------------------------------------------------------------------
+
+  // Returns (sub, uri) where `sub` is the storage folder under a unique
+  // temp root and is NOT yet created on disk (so constructor tests can
+  // pin the auto-create-folder branch). The parent of `sub` IS the unique
+  // temp root, which is what cleanup() removes — keeping the disk clean
+  // even when a test fails before the storage folder gets created.
+  private def mkTempUri(prefix: String): (Path, URI) = {
+    val root = Files.createTempDirectory(s"vfs-record-storage-spec-$prefix-")
+    val sub = root.resolve("logs")
+    (sub, sub.toUri)
+  }
+
+  // Always clean from the parent temp root so any sibling files / partial
+  // state created by a failing test are also removed. `Files.walk` returns
+  // a closeable Stream backed by an open directory handle — wrap in
+  // try/finally so the handle is released even if traversal throws,
+  // otherwise temp-dir deletion can flake on Windows.
+  private def cleanup(sub: Path): Unit = {
+    val root = sub.getParent
+    if (root == null || !Files.exists(root)) return
+    val stream = Files.walk(root)
+    try {
+      stream
+        .sorted(java.util.Comparator.reverseOrder())
+        .forEach(child => Files.deleteIfExists(child))
+    } finally {
+      stream.close()
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Constructor — auto-create folder
+  // 
---------------------------------------------------------------------------
+
+  "VFSRecordStorage constructor" should
+    "create the target folder when it does not yet exist" in {
+    val (path, uri) = mkTempUri("auto-create")
+    assert(!Files.exists(path), "precondition: folder must not exist before 
construction")
+    try {
+      val _ = new VFSRecordStorage[String](uri)
+      assert(Files.exists(path), "constructor should auto-create the folder")
+      assert(Files.isDirectory(path))
+    } finally {
+      cleanup(path)
+    }
+  }
+
+  it should "leave an existing folder intact" in {
+    // If the folder already exists, the constructor's existence check
+    // short-circuits and the folder must not be recreated / wiped.
+    val (path, uri) = mkTempUri("existing")
+    Files.createDirectories(path)
+    val sentinel = path.resolve("sentinel.txt")
+    Files.writeString(sentinel, "keep-me")
+    try {
+      val _ = new VFSRecordStorage[String](uri)
+      assert(Files.exists(sentinel), "constructor must not delete pre-existing 
files in the folder")
+      assert(Files.readString(sentinel) == "keep-me")
+    } finally {
+      cleanup(path)
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // getWriter / getReader — round-trip via the production serde
+  // 
---------------------------------------------------------------------------
+
+  "VFSRecordStorage.getWriter + getReader" should
+    "round-trip a sequence of records through a local file:// URI" in {
+    val (path, uri) = mkTempUri("round-trip")
+    try {
+      val storage = new VFSRecordStorage[String](uri)
+      val writer = storage.getWriter("records.bin")
+      writer.writeRecord("one")
+      writer.writeRecord("two")
+      writer.writeRecord("three")
+      writer.flush()
+      writer.close()
+
+      // The file produced by getWriter must be visible on disk under the
+      // configured folder URI (proves we wrote to the right place).
+      assert(Files.exists(path.resolve("records.bin")))
+
+      val records = storage.getReader("records.bin").mkRecordIterator().toList
+      assert(records == List("one", "two", "three"))
+    } finally {
+      cleanup(path)
+    }
+  }
+
+  it should "produce an empty iterator when reading a file containing no 
records" in {
+    // An empty file (writer opened and closed without writing) must read
+    // back as zero records, not throw.
+    val (path, uri) = mkTempUri("empty-file")
+    try {
+      val storage = new VFSRecordStorage[String](uri)
+      val writer = storage.getWriter("empty.bin")
+      writer.flush()
+      writer.close()
+      assert(Files.exists(path.resolve("empty.bin")))
+      val records = storage.getReader("empty.bin").mkRecordIterator().toList
+      assert(records.isEmpty)
+    } finally {
+      cleanup(path)
+    }
+  }
+
+  it should "support multiple distinct files under the same storage folder" in 
{
+    // Two writers under the same VFSRecordStorage instance must produce
+    // independent files — no cross-pollination.
+    val (path, uri) = mkTempUri("two-files")
+    try {
+      val storage = new VFSRecordStorage[String](uri)
+      val w1 = storage.getWriter("a.bin")
+      w1.writeRecord("from-a")
+      w1.flush(); w1.close()
+      val w2 = storage.getWriter("b.bin")
+      w2.writeRecord("from-b")
+      w2.flush(); w2.close()
+      assert(storage.getReader("a.bin").mkRecordIterator().toList == 
List("from-a"))
+      assert(storage.getReader("b.bin").mkRecordIterator().toList == 
List("from-b"))
+    } finally {
+      cleanup(path)
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // deleteStorage
+  // 
---------------------------------------------------------------------------
+
+  "VFSRecordStorage.deleteStorage" should
+    "remove the folder created by the constructor along with its contents" in {
+    val (path, uri) = mkTempUri("delete")
+    try {
+      val storage = new VFSRecordStorage[String](uri)
+      val writer = storage.getWriter("data.bin")
+      writer.writeRecord("payload")
+      writer.flush(); writer.close()
+      assert(Files.exists(path.resolve("data.bin")))
+
+      storage.deleteStorage()
+      assert(!Files.exists(path), "deleteStorage should remove the storage 
folder")
+    } finally {
+      cleanup(path)
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // containsFolder
+  // 
---------------------------------------------------------------------------
+
+  "VFSRecordStorage.containsFolder" should "return true for an existing 
sub-folder" in {
+    val (path, uri) = mkTempUri("contains-folder")
+    try {
+      val storage = new VFSRecordStorage[String](uri)
+      // Create the sub-folder AFTER the storage is constructed so the
+      // test pins the live-lookup behavior of containsFolder (not a
+      // value cached at construction time).
+      Files.createDirectory(path.resolve("nested"))
+      assert(storage.containsFolder("nested"))
+    } finally {
+      cleanup(path)
+    }
+  }
+
+  it should "return false for a missing child entry" in {
+    val (path, uri) = mkTempUri("contains-missing")
+    try {
+      val storage = new VFSRecordStorage[String](uri)
+      assert(!storage.containsFolder("does-not-exist"))
+    } finally {
+      cleanup(path)
+    }
+  }
+
+  it should "return false when the child entry exists but is a file (not a 
folder)" in {
+    // The `containsFolder` contract is "exists AND isFolder". A plain
+    // file with the requested name must NOT register as a folder.
+    val (path, uri) = mkTempUri("contains-file")
+    try {
+      val storage = new VFSRecordStorage[String](uri)
+      Files.writeString(path.resolve("looks-like-folder"), "i am a file")
+      assert(!storage.containsFolder("looks-like-folder"))
+    } finally {
+      cleanup(path)
+    }
+  }
+}

Reply via email to