This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 75b46197d3 test(amber): add unit test coverage for record-storage
cluster (#5447)
75b46197d3 is described below
commit 75b46197d3584e0f211d5d3211bef571f5c61e9c
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Jun 7 16:32:10 2026 -0700
test(amber): add unit test coverage for record-storage cluster (#5447)
### What changes were proposed in this PR?
Pin behavior of three previously-uncovered modules in
`engine/common/storage` that sit on the checkpoint / fault-tolerance hot
path via `SequentialRecordStorage.getStorage(...)`. No production-code
changes.
| Spec | Source class | Tests |
| --- | --- | --- |
| `EmptyRecordStorageSpec` | `EmptyRecordStorage` | 11 |
| `VFSRecordStorageSpec` | `VFSRecordStorage` | 9 |
| `SequentialRecordStorageSpec` | `SequentialRecordStorage` (abstract +
companion) | 9 |
All three spec files follow the `<srcClassName>Spec.scala` one-to-one
convention.
**Behavior pinned**
| Surface | Contract |
| --- | --- |
| `SequentialRecordStorage.getStorage(None)` | dispatches to
`EmptyRecordStorage` |
| `SequentialRecordStorage.getStorage(Some(file://…))` | dispatches to
`VFSRecordStorage` and the returned instance round-trips a record |
| `SequentialRecordWriter` / `SequentialRecordReader` | round-trip a
sequence of records through the size-prefixed binary frame; the reader's
`inputStreamGen` thunk supports re-reading the same byte stream |
| `SequentialRecordStorage.fetchAllRecords` | yields the underlying
iterator's records in order (and `Iterable.empty` when nothing was
written) |
| `VFSRecordStorage` constructor | auto-creates the target folder;
leaves an existing folder + contents intact |
| `VFSRecordStorage.getWriter` / `getReader` | round-trip records
through a local `file://` URI; produce empty iterator when the file has
no records; multiple files under the same folder do not cross-pollinate
|
| `VFSRecordStorage.deleteStorage` | removes the on-disk folder created
by the constructor |
| `VFSRecordStorage.containsFolder` | distinguishes existing folder vs.
existing file vs. missing entry |
| `EmptyRecordStorage.containsFolder` | always returns `false`
regardless of folder name |
| `EmptyRecordStorage.deleteStorage` | is a safe no-op (idempotent) |
| `EmptyRecordStorage.getReader` | yields zero records for any fileName;
successive `getReader` calls produce independent iterators |
| `EmptyRecordStorage.getWriter` | returns a writer whose `flush()` /
`close()` work without `writeRecord` having been called; a second writer
is unaffected by closing the first |
**Notes**
- The `hdfs://` dispatch branch of `getStorage` is deliberately left out
— `HDFSRecordStorage`'s constructor calls `FileSystem.get`, which can
block on DNS / network and is unit-test-hostile. The branch is a single
line and any regression there would surface immediately in higher-level
checkpoint / fault-tolerance suites that exercise `hdfs://` URIs.
- The serde-touching paths (`SequentialRecordWriter.writeRecord` /
`SequentialRecordReader`'s iterator) hard-code `AmberRuntime.serde`. The
two specs that exercise this path (`VFSRecordStorageSpec`,
`SequentialRecordStorageSpec`) own a suite-local `ActorSystem` and
inject it into `AmberRuntime` via reflection, tearing it down in
`afterAll` — same pattern as `CheckpointSubsystemSpec` /
`ClientEventSpec`. `EmptyRecordStorageSpec` deliberately avoids
`writeRecord` so it does not need the harness.
### Any related issues, documentation, discussions?
Closes #5446.
### How was this PR tested?
Pure unit-test additions; verified locally with:
- `sbt "WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.common.storage.EmptyRecordStorageSpec
org.apache.texera.amber.engine.common.storage.SequentialRecordStorageSpec
org.apache.texera.amber.engine.common.storage.VFSRecordStorageSpec"` —
29 tests, all green
- `sbt scalafmtCheckAll` — clean
- CI to confirm
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Sonnet 4.5)
---
.../common/storage/EmptyRecordStorageSpec.scala | 155 +++++++++++++
.../storage/SequentialRecordStorageSpec.scala | 244 +++++++++++++++++++
.../common/storage/VFSRecordStorageSpec.scala | 258 +++++++++++++++++++++
3 files changed, 657 insertions(+)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/EmptyRecordStorageSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/EmptyRecordStorageSpec.scala
new file mode 100644
index 0000000000..e3e73fe235
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/EmptyRecordStorageSpec.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.common.storage
+
+import org.scalatest.flatspec.AnyFlatSpec
+
+class EmptyRecordStorageSpec extends AnyFlatSpec {
+
+ // EmptyRecordStorage is the null-object branch of
+ // SequentialRecordStorage.getStorage(None). Its reader is backed by a
+ // 0-byte NullInputStream and its writer by NullOutputStream — so the
+ // reader yields zero records WITHOUT touching AmberRuntime.serde
+ // (the iterator's internalNext catches the EOF from readInt() and
+ // returns null before deserialize would run).
+ //
+ // The writer DOES call AmberRuntime.serde.serialize on writeRecord —
+ // but every test in this suite avoids writeRecord, exercising only
+ // the no-AmberRuntime surface (constructor / flush / close / reader
+ // hasNext). This keeps the spec free of the suite-local ActorSystem
+ // setup that ClientEventSpec / CheckpointSubsystemSpec need; the
+ // serde-touching write path is pinned in SequentialRecordStorageSpec
+ // where the harness is set up for it.
+
+ //
---------------------------------------------------------------------------
+ // containsFolder
+ //
---------------------------------------------------------------------------
+
+ "EmptyRecordStorage.containsFolder" should "return false for any folder
name" in {
+ val storage = new EmptyRecordStorage[String]()
+ assert(!storage.containsFolder("anything"))
+ assert(!storage.containsFolder(""))
+ assert(!storage.containsFolder("a/b/c"))
+ }
+
+ //
---------------------------------------------------------------------------
+ // deleteStorage
+ //
---------------------------------------------------------------------------
+
+ "EmptyRecordStorage.deleteStorage" should "be a safe no-op" in {
+ val storage = new EmptyRecordStorage[String]()
+ storage.deleteStorage() // must not throw
+ // Idempotent — second call also no-ops.
+ storage.deleteStorage()
+ succeed
+ }
+
+ //
---------------------------------------------------------------------------
+ // getReader — zero-record iterator (no serde dependency)
+ //
---------------------------------------------------------------------------
+
+ "EmptyRecordStorage.getReader" should
+ "return a non-null reader whose iterator is empty (hasNext == false)" in {
+ val storage = new EmptyRecordStorage[String]()
+ val reader = storage.getReader("any-file")
+ assert(reader != null)
+ val iter = reader.mkRecordIterator()
+ assert(!iter.hasNext, "expected an empty iterator from a
NullInputStream-backed reader")
+ }
+
+ it should "yield an empty list on toList" in {
+ val storage = new EmptyRecordStorage[String]()
+ val records = storage.getReader("any-file").mkRecordIterator().toList
+ assert(records.isEmpty)
+ }
+
+ it should "produce independent empty iterators across successive getReader
calls" in {
+ // Behavior we want pinned: exhausting one reader does not leak state
+ // into a second reader returned by a later getReader call. Independent
+ // of whether the two readers happen to be the same instance —
+ // mkRecordIterator() on each must produce its own empty iterator.
+ val storage = new EmptyRecordStorage[String]()
+ val r1 = storage.getReader("a")
+ val r2 = storage.getReader("a")
+ val _ = r1.mkRecordIterator().toList
+ assert(r2.mkRecordIterator().toList.isEmpty)
+ }
+
+ it should "ignore the fileName argument (every name produces the same empty
iterator)" in {
+ // The contract is "any read against an EmptyRecordStorage produces no
+ // records" — regardless of fileName.
+ val storage = new EmptyRecordStorage[String]()
+ assert(storage.getReader("alpha").mkRecordIterator().toList.isEmpty)
+ assert(storage.getReader("beta").mkRecordIterator().toList.isEmpty)
+ assert(storage.getReader("").mkRecordIterator().toList.isEmpty)
+ }
+
+ //
---------------------------------------------------------------------------
+ // getWriter — construction & lifecycle without writeRecord
+ //
---------------------------------------------------------------------------
+
+ "EmptyRecordStorage.getWriter" should "return a non-null writer" in {
+ val storage = new EmptyRecordStorage[String]()
+ val writer = storage.getWriter("any-file")
+ assert(writer != null)
+ }
+
+ it should "allow flush() before any writeRecord without throwing" in {
+ val storage = new EmptyRecordStorage[String]()
+ val writer = storage.getWriter("any-file")
+ writer.flush()
+ succeed
+ }
+
+ it should "allow close() before any writeRecord without throwing" in {
+ val storage = new EmptyRecordStorage[String]()
+ val writer = storage.getWriter("any-file")
+ writer.close()
+ succeed
+ }
+
+ it should "keep a second writer usable after the first is closed" in {
+ // Behavior we want pinned: closing one writer does not invalidate a
+ // second writer returned by a later getWriter call. The two writers
+ // must independently support flush/close without interfering.
+ val storage = new EmptyRecordStorage[String]()
+ val w1 = storage.getWriter("a")
+ val w2 = storage.getWriter("a")
+ w1.close()
+ w2.flush()
+ w2.close()
+ succeed
+ }
+
+ //
---------------------------------------------------------------------------
+ // Type parameter erasure — different T must still produce a working
+ // storage object. EmptyRecordStorage's behavior is independent of T;
+ // pin that with a non-String T to catch any accidental ClassTag misuse.
+ //
---------------------------------------------------------------------------
+
+ "EmptyRecordStorage[T]" should "construct cleanly for a different T
(java.lang.Integer)" in {
+ val storage = new EmptyRecordStorage[java.lang.Integer]()
+ assert(!storage.containsFolder("anything"))
+ assert(storage.getReader("x").mkRecordIterator().toList.isEmpty)
+ storage.getWriter("x").close()
+ storage.deleteStorage()
+ succeed
+ }
+}
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/SequentialRecordStorageSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/SequentialRecordStorageSpec.scala
new file mode 100644
index 0000000000..ce4e2fba48
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/SequentialRecordStorageSpec.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.common.storage
+
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.serialization.{Serialization, SerializationExtension}
+import org.apache.pekko.testkit.TestKit
+import org.apache.texera.amber.engine.common.AmberRuntime
+import org.apache.texera.amber.engine.common.storage.SequentialRecordStorage.{
+ SequentialRecordReader,
+ SequentialRecordWriter
+}
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.flatspec.AnyFlatSpec
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream,
DataOutputStream}
+import java.nio.file.{Files, Path}
+
+class SequentialRecordStorageSpec extends AnyFlatSpec with BeforeAndAfterAll {
+
+ //
---------------------------------------------------------------------------
+ // Suite-local Pekko serde injected into AmberRuntime via reflection
+ //
---------------------------------------------------------------------------
+ //
+ // `SequentialRecordWriter.writeRecord` / `SequentialRecordReader`'s
+ // iterator both hard-code `AmberRuntime.serde`. Pattern matches
+ // CheckpointSubsystemSpec / ClientEventSpec: own a suite-local
+ // ActorSystem, inject it into AmberRuntime's private vars via
+ // reflection, tear down in afterAll.
+
+ private val testSystem: ActorSystem =
+ ActorSystem("SequentialRecordStorageSpec-test", AmberRuntime.pekkoConfig)
+ private val testSerde: Serialization = SerializationExtension(testSystem)
+
+ private def setAmberRuntimeField(name: String, value: AnyRef): Unit = {
+ val field = AmberRuntime.getClass.getDeclaredField(name)
+ field.setAccessible(true)
+ field.set(AmberRuntime, value)
+ }
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ setAmberRuntimeField("_actorSystem", testSystem)
+ setAmberRuntimeField("_serde", testSerde)
+ }
+
+ override protected def afterAll(): Unit = {
+ setAmberRuntimeField("_serde", null)
+ setAmberRuntimeField("_actorSystem", null)
+ TestKit.shutdownActorSystem(testSystem)
+ super.afterAll()
+ }
+
+ // `Files.walk` returns a closeable Stream backed by an open directory
+ // handle — wrap in try/finally so the handle is released even if
+ // traversal throws, otherwise temp-dir deletion can flake on Windows.
+ private def deleteRecursively(p: Path): Unit = {
+ if (!Files.exists(p)) return
+ val stream = Files.walk(p)
+ try {
+ stream
+ .sorted(java.util.Comparator.reverseOrder())
+ .forEach(child => Files.deleteIfExists(child))
+ } finally {
+ stream.close()
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // SequentialRecordWriter + SequentialRecordReader — in-memory round-trip
+ //
---------------------------------------------------------------------------
+ //
+ // Pin the size-prefixed framing contract directly, using
+ // ByteArrayInput/OutputStream so the test does not depend on any
+ // concrete SequentialRecordStorage subclass (the cross-cutting Reader/
+ // Writer pair is what we're characterizing here).
+
+ "SequentialRecordWriter + SequentialRecordReader" should
+ "round-trip a sequence of records through a size-prefixed binary frame" in
{
+ val baos = new ByteArrayOutputStream()
+ val writer = new SequentialRecordWriter[String](new DataOutputStream(baos))
+ writer.writeRecord("alpha")
+ writer.writeRecord("beta")
+ writer.writeRecord("gamma")
+ writer.flush()
+ writer.close()
+
+ val reader = new SequentialRecordReader[String](() =>
+ new DataInputStream(new ByteArrayInputStream(baos.toByteArray))
+ )
+ assert(reader.mkRecordIterator().toList == List("alpha", "beta", "gamma"))
+ }
+
+ it should "round-trip an empty stream as a zero-element iterator" in {
+ val baos = new ByteArrayOutputStream()
+ val writer = new SequentialRecordWriter[String](new DataOutputStream(baos))
+ writer.flush()
+ writer.close()
+ val reader = new SequentialRecordReader[String](() =>
+ new DataInputStream(new ByteArrayInputStream(baos.toByteArray))
+ )
+ assert(reader.mkRecordIterator().toList.isEmpty)
+ }
+
+ it should "round-trip a single record" in {
+ // The size-prefixed format has the same shape for 1 element as for
+ // many, but pinning the 1-record case independently catches an
+ // off-by-one in the iterator's prefetch logic.
+ val baos = new ByteArrayOutputStream()
+ val writer = new SequentialRecordWriter[String](new DataOutputStream(baos))
+ writer.writeRecord("only")
+ writer.flush()
+ writer.close()
+ val reader = new SequentialRecordReader[String](() =>
+ new DataInputStream(new ByteArrayInputStream(baos.toByteArray))
+ )
+ val iter = reader.mkRecordIterator()
+ assert(iter.hasNext)
+ assert(iter.next() == "only")
+ assert(!iter.hasNext, "iterator must report exhaustion after the only
element is consumed")
+ }
+
+ it should "support reading the same byte stream more than once via the
inputStreamGen thunk" in {
+ // The reader takes a `() => DataInputStream` so it can be re-opened.
+ // Two independent calls to mkRecordIterator must each consume their
+ // own DataInputStream (constructed by the thunk) and produce the
+ // same sequence — proving the thunk is invoked per iterator and that
+ // the reader holds no shared mutable input state.
+ val baos = new ByteArrayOutputStream()
+ val writer = new SequentialRecordWriter[String](new DataOutputStream(baos))
+ writer.writeRecord("a")
+ writer.writeRecord("b")
+ writer.flush()
+ writer.close()
+ val payload = baos.toByteArray
+ val reader = new SequentialRecordReader[String](() =>
+ new DataInputStream(new ByteArrayInputStream(payload))
+ )
+ assert(reader.mkRecordIterator().toList == List("a", "b"))
+ assert(reader.mkRecordIterator().toList == List("a", "b"))
+ }
+
+ //
---------------------------------------------------------------------------
+ // SequentialRecordStorage.fetchAllRecords — companion-level helper
+ //
---------------------------------------------------------------------------
+
+ "SequentialRecordStorage.fetchAllRecords" should
+ "return every record written to the underlying storage in order" in {
+ val tmp = Files.createTempDirectory("seq-storage-fetch-all-")
+ val sub = tmp.resolve("logs")
+ try {
+ val storage = new VFSRecordStorage[String](sub.toUri)
+ val writer = storage.getWriter("file-1")
+ writer.writeRecord("r1")
+ writer.writeRecord("r2")
+ writer.writeRecord("r3")
+ writer.flush(); writer.close()
+
+ val all = SequentialRecordStorage.fetchAllRecords(storage,
"file-1").toList
+ assert(all == List("r1", "r2", "r3"))
+ } finally {
+ deleteRecursively(tmp)
+ }
+ }
+
+ it should "return an empty Iterable when the underlying reader has no
records" in {
+ val tmp = Files.createTempDirectory("seq-storage-fetch-empty-")
+ val sub = tmp.resolve("logs")
+ try {
+ val storage = new VFSRecordStorage[String](sub.toUri)
+ val writer = storage.getWriter("empty")
+ writer.flush(); writer.close()
+ assert(SequentialRecordStorage.fetchAllRecords(storage,
"empty").toList.isEmpty)
+ } finally {
+ deleteRecursively(tmp)
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // SequentialRecordStorage.getStorage — factory dispatch
+ //
---------------------------------------------------------------------------
+ //
+ // The factory dispatches on the URI scheme. We pin the two
+ // schemes that can be exercised without external infrastructure
+ // (None → Empty, file:// → VFS). The hdfs:// branch is unit-test-
+ // hostile (HDFSRecordStorage's constructor calls FileSystem.get,
+ // which can block on DNS / network), so it is deliberately left
+ // out of this characterization — the factory's
+ // `if (scheme.toLowerCase == "hdfs")` is a single line and any
+ // regression there would surface immediately in higher-level
+ // checkpoint / fault-tolerance suites that use hdfs:// URIs.
+
+ "SequentialRecordStorage.getStorage" should
+ "return an EmptyRecordStorage when the location is None" in {
+ val storage = SequentialRecordStorage.getStorage[String](None)
+ assert(storage.isInstanceOf[EmptyRecordStorage[_]])
+ }
+
+ it should "return a VFSRecordStorage for a file:// URI" in {
+ val tmp = Files.createTempDirectory("seq-storage-factory-vfs-")
+ val sub = tmp.resolve("logs")
+ try {
+ val storage = SequentialRecordStorage.getStorage[String](Some(sub.toUri))
+ assert(storage.isInstanceOf[VFSRecordStorage[_]])
+ // Round-trip a record through the dispatched VFSRecordStorage to
+ // prove the factory actually returned a usable instance (not a
+ // half-initialized one).
+ val w = storage.getWriter("data")
+ w.writeRecord("payload")
+ w.flush(); w.close()
+ val r = storage.getReader("data").mkRecordIterator().toList
+ assert(r == List("payload"))
+ } finally {
+ deleteRecursively(tmp)
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Cross-check — fetchAllRecords composes with the factory-produced storage
+ //
---------------------------------------------------------------------------
+
+ "fetchAllRecords on a factory-produced EmptyRecordStorage" should
+ "yield zero records" in {
+ val storage = SequentialRecordStorage.getStorage[String](None)
+ assert(SequentialRecordStorage.fetchAllRecords(storage,
"any").toList.isEmpty)
+ }
+}
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/VFSRecordStorageSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/VFSRecordStorageSpec.scala
new file mode 100644
index 0000000000..258fd268ad
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/VFSRecordStorageSpec.scala
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.common.storage
+
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.serialization.{Serialization, SerializationExtension}
+import org.apache.pekko.testkit.TestKit
+import org.apache.texera.amber.engine.common.AmberRuntime
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.flatspec.AnyFlatSpec
+
+import java.net.URI
+import java.nio.file.{Files, Path}
+
+class VFSRecordStorageSpec extends AnyFlatSpec with BeforeAndAfterAll {
+
+ //
---------------------------------------------------------------------------
+ // Suite-local Pekko serde injected into AmberRuntime via reflection
+ //
---------------------------------------------------------------------------
+ //
+ // `SequentialRecordWriter.writeRecord` hard-codes `AmberRuntime.serde`,
+ // so any test that round-trips a record needs AmberRuntime initialized.
+ // Pattern matches CheckpointSubsystemSpec / ClientEventSpec — own a
+ // suite-local ActorSystem, inject it into AmberRuntime's private vars,
+ // tear down in afterAll.
+
+ private val testSystem: ActorSystem =
+ ActorSystem("VFSRecordStorageSpec-test", AmberRuntime.pekkoConfig)
+ private val testSerde: Serialization = SerializationExtension(testSystem)
+
+ private def setAmberRuntimeField(name: String, value: AnyRef): Unit = {
+ val field = AmberRuntime.getClass.getDeclaredField(name)
+ field.setAccessible(true)
+ field.set(AmberRuntime, value)
+ }
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ setAmberRuntimeField("_actorSystem", testSystem)
+ setAmberRuntimeField("_serde", testSerde)
+ }
+
+ override protected def afterAll(): Unit = {
+ setAmberRuntimeField("_serde", null)
+ setAmberRuntimeField("_actorSystem", null)
+ TestKit.shutdownActorSystem(testSystem)
+ super.afterAll()
+ }
+
+ //
---------------------------------------------------------------------------
+ // Temp-directory helpers — every test owns its own scratch folder so the
+ // cases are independent and parallel-safe.
+ //
---------------------------------------------------------------------------
+
+ // Returns (sub, uri) where `sub` is the storage folder under a unique
+ // temp root and is NOT yet created on disk (so constructor tests can
+ // pin the auto-create-folder branch). The parent of `sub` IS the unique
+ // temp root, which is what cleanup() removes — keeping the disk clean
+ // even when a test fails before the storage folder gets created.
+ private def mkTempUri(prefix: String): (Path, URI) = {
+ val root = Files.createTempDirectory(s"vfs-record-storage-spec-$prefix-")
+ val sub = root.resolve("logs")
+ (sub, sub.toUri)
+ }
+
+ // Always clean from the parent temp root so any sibling files / partial
+ // state created by a failing test are also removed. `Files.walk` returns
+ // a closeable Stream backed by an open directory handle — wrap in
+ // try/finally so the handle is released even if traversal throws,
+ // otherwise temp-dir deletion can flake on Windows.
+ private def cleanup(sub: Path): Unit = {
+ val root = sub.getParent
+ if (root == null || !Files.exists(root)) return
+ val stream = Files.walk(root)
+ try {
+ stream
+ .sorted(java.util.Comparator.reverseOrder())
+ .forEach(child => Files.deleteIfExists(child))
+ } finally {
+ stream.close()
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Constructor — auto-create folder
+ //
---------------------------------------------------------------------------
+
+ "VFSRecordStorage constructor" should
+ "create the target folder when it does not yet exist" in {
+ val (path, uri) = mkTempUri("auto-create")
+ assert(!Files.exists(path), "precondition: folder must not exist before
construction")
+ try {
+ val _ = new VFSRecordStorage[String](uri)
+ assert(Files.exists(path), "constructor should auto-create the folder")
+ assert(Files.isDirectory(path))
+ } finally {
+ cleanup(path)
+ }
+ }
+
+ it should "leave an existing folder intact" in {
+ // If the folder already exists, the constructor's existence check
+ // short-circuits and the folder must not be recreated / wiped.
+ val (path, uri) = mkTempUri("existing")
+ Files.createDirectories(path)
+ val sentinel = path.resolve("sentinel.txt")
+ Files.writeString(sentinel, "keep-me")
+ try {
+ val _ = new VFSRecordStorage[String](uri)
+ assert(Files.exists(sentinel), "constructor must not delete pre-existing
files in the folder")
+ assert(Files.readString(sentinel) == "keep-me")
+ } finally {
+ cleanup(path)
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // getWriter / getReader — round-trip via the production serde
+ //
---------------------------------------------------------------------------
+
+ "VFSRecordStorage.getWriter + getReader" should
+ "round-trip a sequence of records through a local file:// URI" in {
+ val (path, uri) = mkTempUri("round-trip")
+ try {
+ val storage = new VFSRecordStorage[String](uri)
+ val writer = storage.getWriter("records.bin")
+ writer.writeRecord("one")
+ writer.writeRecord("two")
+ writer.writeRecord("three")
+ writer.flush()
+ writer.close()
+
+ // The file produced by getWriter must be visible on disk under the
+ // configured folder URI (proves we wrote to the right place).
+ assert(Files.exists(path.resolve("records.bin")))
+
+ val records = storage.getReader("records.bin").mkRecordIterator().toList
+ assert(records == List("one", "two", "three"))
+ } finally {
+ cleanup(path)
+ }
+ }
+
+ it should "produce an empty iterator when reading a file containing no
records" in {
+ // An empty file (writer opened and closed without writing) must read
+ // back as zero records, not throw.
+ val (path, uri) = mkTempUri("empty-file")
+ try {
+ val storage = new VFSRecordStorage[String](uri)
+ val writer = storage.getWriter("empty.bin")
+ writer.flush()
+ writer.close()
+ assert(Files.exists(path.resolve("empty.bin")))
+ val records = storage.getReader("empty.bin").mkRecordIterator().toList
+ assert(records.isEmpty)
+ } finally {
+ cleanup(path)
+ }
+ }
+
+ it should "support multiple distinct files under the same storage folder" in
{
+ // Two writers under the same VFSRecordStorage instance must produce
+ // independent files — no cross-pollination.
+ val (path, uri) = mkTempUri("two-files")
+ try {
+ val storage = new VFSRecordStorage[String](uri)
+ val w1 = storage.getWriter("a.bin")
+ w1.writeRecord("from-a")
+ w1.flush(); w1.close()
+ val w2 = storage.getWriter("b.bin")
+ w2.writeRecord("from-b")
+ w2.flush(); w2.close()
+ assert(storage.getReader("a.bin").mkRecordIterator().toList ==
List("from-a"))
+ assert(storage.getReader("b.bin").mkRecordIterator().toList ==
List("from-b"))
+ } finally {
+ cleanup(path)
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // deleteStorage
+ //
---------------------------------------------------------------------------
+
+ "VFSRecordStorage.deleteStorage" should
+ "remove the folder created by the constructor along with its contents" in {
+ val (path, uri) = mkTempUri("delete")
+ try {
+ val storage = new VFSRecordStorage[String](uri)
+ val writer = storage.getWriter("data.bin")
+ writer.writeRecord("payload")
+ writer.flush(); writer.close()
+ assert(Files.exists(path.resolve("data.bin")))
+
+ storage.deleteStorage()
+ assert(!Files.exists(path), "deleteStorage should remove the storage
folder")
+ } finally {
+ cleanup(path)
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // containsFolder
+ //
---------------------------------------------------------------------------
+
+ "VFSRecordStorage.containsFolder" should "return true for an existing
sub-folder" in {
+ val (path, uri) = mkTempUri("contains-folder")
+ try {
+ val storage = new VFSRecordStorage[String](uri)
+ // Create the sub-folder AFTER the storage is constructed so the
+ // test pins the live-lookup behavior of containsFolder (not a
+ // value cached at construction time).
+ Files.createDirectory(path.resolve("nested"))
+ assert(storage.containsFolder("nested"))
+ } finally {
+ cleanup(path)
+ }
+ }
+
+ it should "return false for a missing child entry" in {
+ val (path, uri) = mkTempUri("contains-missing")
+ try {
+ val storage = new VFSRecordStorage[String](uri)
+ assert(!storage.containsFolder("does-not-exist"))
+ } finally {
+ cleanup(path)
+ }
+ }
+
+ it should "return false when the child entry exists but is a file (not a
folder)" in {
+ // The `containsFolder` contract is "exists AND isFolder". A plain
+ // file with the requested name must NOT register as a folder.
+ val (path, uri) = mkTempUri("contains-file")
+ try {
+ val storage = new VFSRecordStorage[String](uri)
+ Files.writeString(path.resolve("looks-like-folder"), "i am a file")
+ assert(!storage.containsFolder("looks-like-folder"))
+ } finally {
+ cleanup(path)
+ }
+ }
+}