This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5447-dd8ab8ae946d2c062faf0e0494235169feeedd22 in repository https://gitbox.apache.org/repos/asf/texera.git
commit 75b46197d3584e0f211d5d3211bef571f5c61e9c Author: Xinyuan Lin <[email protected]> AuthorDate: Sun Jun 7 16:32:10 2026 -0700 test(amber): add unit test coverage for record-storage cluster (#5447) ### What changes were proposed in this PR? Pin behavior of three previously-uncovered modules in `engine/common/storage` that sit on the checkpoint / fault-tolerance hot path via `SequentialRecordStorage.getStorage(...)`. No production-code changes. | Spec | Source class | Tests | | --- | --- | --- | | `EmptyRecordStorageSpec` | `EmptyRecordStorage` | 11 | | `VFSRecordStorageSpec` | `VFSRecordStorage` | 9 | | `SequentialRecordStorageSpec` | `SequentialRecordStorage` (abstract + companion) | 9 | All three spec files follow the `<srcClassName>Spec.scala` one-to-one convention. **Behavior pinned** | Surface | Contract | | --- | --- | | `SequentialRecordStorage.getStorage(None)` | dispatches to `EmptyRecordStorage` | | `SequentialRecordStorage.getStorage(Some(file://…))` | dispatches to `VFSRecordStorage` and the returned instance round-trips a record | | `SequentialRecordWriter` / `SequentialRecordReader` | round-trip a sequence of records through the size-prefixed binary frame; the reader's `inputStreamGen` thunk supports re-reading the same byte stream | | `SequentialRecordStorage.fetchAllRecords` | yields the underlying iterator's records in order (and `Iterable.empty` when nothing was written) | | `VFSRecordStorage` constructor | auto-creates the target folder; leaves an existing folder + contents intact | | `VFSRecordStorage.getWriter` / `getReader` | round-trip records through a local `file://` URI; produce empty iterator when the file has no records; multiple files under the same folder do not cross-pollinate | | `VFSRecordStorage.deleteStorage` | removes the on-disk folder created by the constructor | | `VFSRecordStorage.containsFolder` | distinguishes existing folder vs. existing file vs. missing entry | | `EmptyRecordStorage.containsFolder` | always returns `false` regardless of folder name | | `EmptyRecordStorage.deleteStorage` | is a safe no-op (idempotent) | | `EmptyRecordStorage.getReader` | yields zero records for any fileName; successive `getReader` calls produce independent iterators | | `EmptyRecordStorage.getWriter` | returns a writer whose `flush()` / `close()` work without `writeRecord` having been called; a second writer is unaffected by closing the first | **Notes** - The `hdfs://` dispatch branch of `getStorage` is deliberately left out — `HDFSRecordStorage`'s constructor calls `FileSystem.get`, which can block on DNS / network and is unit-test-hostile. The branch is a single line and any regression there would surface immediately in higher-level checkpoint / fault-tolerance suites that exercise `hdfs://` URIs. - The serde-touching paths (`SequentialRecordWriter.writeRecord` / `SequentialRecordReader`'s iterator) hard-code `AmberRuntime.serde`. The two specs that exercise this path (`VFSRecordStorageSpec`, `SequentialRecordStorageSpec`) own a suite-local `ActorSystem` and inject it into `AmberRuntime` via reflection, tearing it down in `afterAll` — same pattern as `CheckpointSubsystemSpec` / `ClientEventSpec`. `EmptyRecordStorageSpec` deliberately avoids `writeRecord` so it does not need the harness. ### Any related issues, documentation, discussions? Closes #5446. ### How was this PR tested? Pure unit-test additions; verified locally with: - `sbt "WorkflowExecutionService/testOnly org.apache.texera.amber.engine.common.storage.EmptyRecordStorageSpec org.apache.texera.amber.engine.common.storage.SequentialRecordStorageSpec org.apache.texera.amber.engine.common.storage.VFSRecordStorageSpec"` — 29 tests, all green - `sbt scalafmtCheckAll` — clean - CI to confirm ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Sonnet 4.5) --- .../common/storage/EmptyRecordStorageSpec.scala | 155 +++++++++++++ .../storage/SequentialRecordStorageSpec.scala | 244 +++++++++++++++++++ .../common/storage/VFSRecordStorageSpec.scala | 258 +++++++++++++++++++++ 3 files changed, 657 insertions(+) diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/EmptyRecordStorageSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/EmptyRecordStorageSpec.scala new file mode 100644 index 0000000000..e3e73fe235 --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/EmptyRecordStorageSpec.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.common.storage + +import org.scalatest.flatspec.AnyFlatSpec + +class EmptyRecordStorageSpec extends AnyFlatSpec { + + // EmptyRecordStorage is the null-object branch of + // SequentialRecordStorage.getStorage(None). Its reader is backed by a + // 0-byte NullInputStream and its writer by NullOutputStream — so the + // reader yields zero records WITHOUT touching AmberRuntime.serde + // (the iterator's internalNext catches the EOF from readInt() and + // returns null before deserialize would run). + // + // The writer DOES call AmberRuntime.serde.serialize on writeRecord — + // but every test in this suite avoids writeRecord, exercising only + // the no-AmberRuntime surface (constructor / flush / close / reader + // hasNext). This keeps the spec free of the suite-local ActorSystem + // setup that ClientEventSpec / CheckpointSubsystemSpec need; the + // serde-touching write path is pinned in SequentialRecordStorageSpec + // where the harness is set up for it. + + // --------------------------------------------------------------------------- + // containsFolder + // --------------------------------------------------------------------------- + + "EmptyRecordStorage.containsFolder" should "return false for any folder name" in { + val storage = new EmptyRecordStorage[String]() + assert(!storage.containsFolder("anything")) + assert(!storage.containsFolder("")) + assert(!storage.containsFolder("a/b/c")) + } + + // --------------------------------------------------------------------------- + // deleteStorage + // --------------------------------------------------------------------------- + + "EmptyRecordStorage.deleteStorage" should "be a safe no-op" in { + val storage = new EmptyRecordStorage[String]() + storage.deleteStorage() // must not throw + // Idempotent — second call also no-ops. + storage.deleteStorage() + succeed + } + + // --------------------------------------------------------------------------- + // getReader — zero-record iterator (no serde dependency) + // --------------------------------------------------------------------------- + + "EmptyRecordStorage.getReader" should + "return a non-null reader whose iterator is empty (hasNext == false)" in { + val storage = new EmptyRecordStorage[String]() + val reader = storage.getReader("any-file") + assert(reader != null) + val iter = reader.mkRecordIterator() + assert(!iter.hasNext, "expected an empty iterator from a NullInputStream-backed reader") + } + + it should "yield an empty list on toList" in { + val storage = new EmptyRecordStorage[String]() + val records = storage.getReader("any-file").mkRecordIterator().toList + assert(records.isEmpty) + } + + it should "produce independent empty iterators across successive getReader calls" in { + // Behavior we want pinned: exhausting one reader does not leak state + // into a second reader returned by a later getReader call. Independent + // of whether the two readers happen to be the same instance — + // mkRecordIterator() on each must produce its own empty iterator. + val storage = new EmptyRecordStorage[String]() + val r1 = storage.getReader("a") + val r2 = storage.getReader("a") + val _ = r1.mkRecordIterator().toList + assert(r2.mkRecordIterator().toList.isEmpty) + } + + it should "ignore the fileName argument (every name produces the same empty iterator)" in { + // The contract is "any read against an EmptyRecordStorage produces no + // records" — regardless of fileName. + val storage = new EmptyRecordStorage[String]() + assert(storage.getReader("alpha").mkRecordIterator().toList.isEmpty) + assert(storage.getReader("beta").mkRecordIterator().toList.isEmpty) + assert(storage.getReader("").mkRecordIterator().toList.isEmpty) + } + + // --------------------------------------------------------------------------- + // getWriter — construction & lifecycle without writeRecord + // --------------------------------------------------------------------------- + + "EmptyRecordStorage.getWriter" should "return a non-null writer" in { + val storage = new EmptyRecordStorage[String]() + val writer = storage.getWriter("any-file") + assert(writer != null) + } + + it should "allow flush() before any writeRecord without throwing" in { + val storage = new EmptyRecordStorage[String]() + val writer = storage.getWriter("any-file") + writer.flush() + succeed + } + + it should "allow close() before any writeRecord without throwing" in { + val storage = new EmptyRecordStorage[String]() + val writer = storage.getWriter("any-file") + writer.close() + succeed + } + + it should "keep a second writer usable after the first is closed" in { + // Behavior we want pinned: closing one writer does not invalidate a + // second writer returned by a later getWriter call. The two writers + // must independently support flush/close without interfering. + val storage = new EmptyRecordStorage[String]() + val w1 = storage.getWriter("a") + val w2 = storage.getWriter("a") + w1.close() + w2.flush() + w2.close() + succeed + } + + // --------------------------------------------------------------------------- + // Type parameter erasure — different T must still produce a working + // storage object. EmptyRecordStorage's behavior is independent of T; + // pin that with a non-String T to catch any accidental ClassTag misuse. + // --------------------------------------------------------------------------- + + "EmptyRecordStorage[T]" should "construct cleanly for a different T (java.lang.Integer)" in { + val storage = new EmptyRecordStorage[java.lang.Integer]() + assert(!storage.containsFolder("anything")) + assert(storage.getReader("x").mkRecordIterator().toList.isEmpty) + storage.getWriter("x").close() + storage.deleteStorage() + succeed + } +} diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/SequentialRecordStorageSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/SequentialRecordStorageSpec.scala new file mode 100644 index 0000000000..ce4e2fba48 --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/SequentialRecordStorageSpec.scala @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.common.storage + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.serialization.{Serialization, SerializationExtension} +import org.apache.pekko.testkit.TestKit +import org.apache.texera.amber.engine.common.AmberRuntime +import org.apache.texera.amber.engine.common.storage.SequentialRecordStorage.{ + SequentialRecordReader, + SequentialRecordWriter +} +import org.scalatest.BeforeAndAfterAll +import org.scalatest.flatspec.AnyFlatSpec + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} +import java.nio.file.{Files, Path} + +class SequentialRecordStorageSpec extends AnyFlatSpec with BeforeAndAfterAll { + + // --------------------------------------------------------------------------- + // Suite-local Pekko serde injected into AmberRuntime via reflection + // --------------------------------------------------------------------------- + // + // `SequentialRecordWriter.writeRecord` / `SequentialRecordReader`'s + // iterator both hard-code `AmberRuntime.serde`. Pattern matches + // CheckpointSubsystemSpec / ClientEventSpec: own a suite-local + // ActorSystem, inject it into AmberRuntime's private vars via + // reflection, tear down in afterAll. + + private val testSystem: ActorSystem = + ActorSystem("SequentialRecordStorageSpec-test", AmberRuntime.pekkoConfig) + private val testSerde: Serialization = SerializationExtension(testSystem) + + private def setAmberRuntimeField(name: String, value: AnyRef): Unit = { + val field = AmberRuntime.getClass.getDeclaredField(name) + field.setAccessible(true) + field.set(AmberRuntime, value) + } + + override protected def beforeAll(): Unit = { + super.beforeAll() + setAmberRuntimeField("_actorSystem", testSystem) + setAmberRuntimeField("_serde", testSerde) + } + + override protected def afterAll(): Unit = { + setAmberRuntimeField("_serde", null) + setAmberRuntimeField("_actorSystem", null) + TestKit.shutdownActorSystem(testSystem) + super.afterAll() + } + + // `Files.walk` returns a closeable Stream backed by an open directory + // handle — wrap in try/finally so the handle is released even if + // traversal throws, otherwise temp-dir deletion can flake on Windows. + private def deleteRecursively(p: Path): Unit = { + if (!Files.exists(p)) return + val stream = Files.walk(p) + try { + stream + .sorted(java.util.Comparator.reverseOrder()) + .forEach(child => Files.deleteIfExists(child)) + } finally { + stream.close() + } + } + + // --------------------------------------------------------------------------- + // SequentialRecordWriter + SequentialRecordReader — in-memory round-trip + // --------------------------------------------------------------------------- + // + // Pin the size-prefixed framing contract directly, using + // ByteArrayInput/OutputStream so the test does not depend on any + // concrete SequentialRecordStorage subclass (the cross-cutting Reader/ + // Writer pair is what we're characterizing here). + + "SequentialRecordWriter + SequentialRecordReader" should + "round-trip a sequence of records through a size-prefixed binary frame" in { + val baos = new ByteArrayOutputStream() + val writer = new SequentialRecordWriter[String](new DataOutputStream(baos)) + writer.writeRecord("alpha") + writer.writeRecord("beta") + writer.writeRecord("gamma") + writer.flush() + writer.close() + + val reader = new SequentialRecordReader[String](() => + new DataInputStream(new ByteArrayInputStream(baos.toByteArray)) + ) + assert(reader.mkRecordIterator().toList == List("alpha", "beta", "gamma")) + } + + it should "round-trip an empty stream as a zero-element iterator" in { + val baos = new ByteArrayOutputStream() + val writer = new SequentialRecordWriter[String](new DataOutputStream(baos)) + writer.flush() + writer.close() + val reader = new SequentialRecordReader[String](() => + new DataInputStream(new ByteArrayInputStream(baos.toByteArray)) + ) + assert(reader.mkRecordIterator().toList.isEmpty) + } + + it should "round-trip a single record" in { + // The size-prefixed format has the same shape for 1 element as for + // many, but pinning the 1-record case independently catches an + // off-by-one in the iterator's prefetch logic. + val baos = new ByteArrayOutputStream() + val writer = new SequentialRecordWriter[String](new DataOutputStream(baos)) + writer.writeRecord("only") + writer.flush() + writer.close() + val reader = new SequentialRecordReader[String](() => + new DataInputStream(new ByteArrayInputStream(baos.toByteArray)) + ) + val iter = reader.mkRecordIterator() + assert(iter.hasNext) + assert(iter.next() == "only") + assert(!iter.hasNext, "iterator must report exhaustion after the only element is consumed") + } + + it should "support reading the same byte stream more than once via the inputStreamGen thunk" in { + // The reader takes a `() => DataInputStream` so it can be re-opened. + // Two independent calls to mkRecordIterator must each consume their + // own DataInputStream (constructed by the thunk) and produce the + // same sequence — proving the thunk is invoked per iterator and that + // the reader holds no shared mutable input state. + val baos = new ByteArrayOutputStream() + val writer = new SequentialRecordWriter[String](new DataOutputStream(baos)) + writer.writeRecord("a") + writer.writeRecord("b") + writer.flush() + writer.close() + val payload = baos.toByteArray + val reader = new SequentialRecordReader[String](() => + new DataInputStream(new ByteArrayInputStream(payload)) + ) + assert(reader.mkRecordIterator().toList == List("a", "b")) + assert(reader.mkRecordIterator().toList == List("a", "b")) + } + + // --------------------------------------------------------------------------- + // SequentialRecordStorage.fetchAllRecords — companion-level helper + // --------------------------------------------------------------------------- + + "SequentialRecordStorage.fetchAllRecords" should + "return every record written to the underlying storage in order" in { + val tmp = Files.createTempDirectory("seq-storage-fetch-all-") + val sub = tmp.resolve("logs") + try { + val storage = new VFSRecordStorage[String](sub.toUri) + val writer = storage.getWriter("file-1") + writer.writeRecord("r1") + writer.writeRecord("r2") + writer.writeRecord("r3") + writer.flush(); writer.close() + + val all = SequentialRecordStorage.fetchAllRecords(storage, "file-1").toList + assert(all == List("r1", "r2", "r3")) + } finally { + deleteRecursively(tmp) + } + } + + it should "return an empty Iterable when the underlying reader has no records" in { + val tmp = Files.createTempDirectory("seq-storage-fetch-empty-") + val sub = tmp.resolve("logs") + try { + val storage = new VFSRecordStorage[String](sub.toUri) + val writer = storage.getWriter("empty") + writer.flush(); writer.close() + assert(SequentialRecordStorage.fetchAllRecords(storage, "empty").toList.isEmpty) + } finally { + deleteRecursively(tmp) + } + } + + // --------------------------------------------------------------------------- + // SequentialRecordStorage.getStorage — factory dispatch + // --------------------------------------------------------------------------- + // + // The factory dispatches on the URI scheme. We pin the two + // schemes that can be exercised without external infrastructure + // (None → Empty, file:// → VFS). The hdfs:// branch is unit-test- + // hostile (HDFSRecordStorage's constructor calls FileSystem.get, + // which can block on DNS / network), so it is deliberately left + // out of this characterization — the factory's + // `if (scheme.toLowerCase == "hdfs")` is a single line and any + // regression there would surface immediately in higher-level + // checkpoint / fault-tolerance suites that use hdfs:// URIs. + + "SequentialRecordStorage.getStorage" should + "return an EmptyRecordStorage when the location is None" in { + val storage = SequentialRecordStorage.getStorage[String](None) + assert(storage.isInstanceOf[EmptyRecordStorage[_]]) + } + + it should "return a VFSRecordStorage for a file:// URI" in { + val tmp = Files.createTempDirectory("seq-storage-factory-vfs-") + val sub = tmp.resolve("logs") + try { + val storage = SequentialRecordStorage.getStorage[String](Some(sub.toUri)) + assert(storage.isInstanceOf[VFSRecordStorage[_]]) + // Round-trip a record through the dispatched VFSRecordStorage to + // prove the factory actually returned a usable instance (not a + // half-initialized one). + val w = storage.getWriter("data") + w.writeRecord("payload") + w.flush(); w.close() + val r = storage.getReader("data").mkRecordIterator().toList + assert(r == List("payload")) + } finally { + deleteRecursively(tmp) + } + } + + // --------------------------------------------------------------------------- + // Cross-check — fetchAllRecords composes with the factory-produced storage + // --------------------------------------------------------------------------- + + "fetchAllRecords on a factory-produced EmptyRecordStorage" should + "yield zero records" in { + val storage = SequentialRecordStorage.getStorage[String](None) + assert(SequentialRecordStorage.fetchAllRecords(storage, "any").toList.isEmpty) + } +} diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/VFSRecordStorageSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/VFSRecordStorageSpec.scala new file mode 100644 index 0000000000..258fd268ad --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/amber/engine/common/storage/VFSRecordStorageSpec.scala @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.common.storage + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.serialization.{Serialization, SerializationExtension} +import org.apache.pekko.testkit.TestKit +import org.apache.texera.amber.engine.common.AmberRuntime +import org.scalatest.BeforeAndAfterAll +import org.scalatest.flatspec.AnyFlatSpec + +import java.net.URI +import java.nio.file.{Files, Path} + +class VFSRecordStorageSpec extends AnyFlatSpec with BeforeAndAfterAll { + + // --------------------------------------------------------------------------- + // Suite-local Pekko serde injected into AmberRuntime via reflection + // --------------------------------------------------------------------------- + // + // `SequentialRecordWriter.writeRecord` hard-codes `AmberRuntime.serde`, + // so any test that round-trips a record needs AmberRuntime initialized. + // Pattern matches CheckpointSubsystemSpec / ClientEventSpec — own a + // suite-local ActorSystem, inject it into AmberRuntime's private vars, + // tear down in afterAll. + + private val testSystem: ActorSystem = + ActorSystem("VFSRecordStorageSpec-test", AmberRuntime.pekkoConfig) + private val testSerde: Serialization = SerializationExtension(testSystem) + + private def setAmberRuntimeField(name: String, value: AnyRef): Unit = { + val field = AmberRuntime.getClass.getDeclaredField(name) + field.setAccessible(true) + field.set(AmberRuntime, value) + } + + override protected def beforeAll(): Unit = { + super.beforeAll() + setAmberRuntimeField("_actorSystem", testSystem) + setAmberRuntimeField("_serde", testSerde) + } + + override protected def afterAll(): Unit = { + setAmberRuntimeField("_serde", null) + setAmberRuntimeField("_actorSystem", null) + TestKit.shutdownActorSystem(testSystem) + super.afterAll() + } + + // --------------------------------------------------------------------------- + // Temp-directory helpers — every test owns its own scratch folder so the + // cases are independent and parallel-safe. + // --------------------------------------------------------------------------- + + // Returns (sub, uri) where `sub` is the storage folder under a unique + // temp root and is NOT yet created on disk (so constructor tests can + // pin the auto-create-folder branch). The parent of `sub` IS the unique + // temp root, which is what cleanup() removes — keeping the disk clean + // even when a test fails before the storage folder gets created. + private def mkTempUri(prefix: String): (Path, URI) = { + val root = Files.createTempDirectory(s"vfs-record-storage-spec-$prefix-") + val sub = root.resolve("logs") + (sub, sub.toUri) + } + + // Always clean from the parent temp root so any sibling files / partial + // state created by a failing test are also removed. `Files.walk` returns + // a closeable Stream backed by an open directory handle — wrap in + // try/finally so the handle is released even if traversal throws, + // otherwise temp-dir deletion can flake on Windows. + private def cleanup(sub: Path): Unit = { + val root = sub.getParent + if (root == null || !Files.exists(root)) return + val stream = Files.walk(root) + try { + stream + .sorted(java.util.Comparator.reverseOrder()) + .forEach(child => Files.deleteIfExists(child)) + } finally { + stream.close() + } + } + + // --------------------------------------------------------------------------- + // Constructor — auto-create folder + // --------------------------------------------------------------------------- + + "VFSRecordStorage constructor" should + "create the target folder when it does not yet exist" in { + val (path, uri) = mkTempUri("auto-create") + assert(!Files.exists(path), "precondition: folder must not exist before construction") + try { + val _ = new VFSRecordStorage[String](uri) + assert(Files.exists(path), "constructor should auto-create the folder") + assert(Files.isDirectory(path)) + } finally { + cleanup(path) + } + } + + it should "leave an existing folder intact" in { + // If the folder already exists, the constructor's existence check + // short-circuits and the folder must not be recreated / wiped. + val (path, uri) = mkTempUri("existing") + Files.createDirectories(path) + val sentinel = path.resolve("sentinel.txt") + Files.writeString(sentinel, "keep-me") + try { + val _ = new VFSRecordStorage[String](uri) + assert(Files.exists(sentinel), "constructor must not delete pre-existing files in the folder") + assert(Files.readString(sentinel) == "keep-me") + } finally { + cleanup(path) + } + } + + // --------------------------------------------------------------------------- + // getWriter / getReader — round-trip via the production serde + // --------------------------------------------------------------------------- + + "VFSRecordStorage.getWriter + getReader" should + "round-trip a sequence of records through a local file:// URI" in { + val (path, uri) = mkTempUri("round-trip") + try { + val storage = new VFSRecordStorage[String](uri) + val writer = storage.getWriter("records.bin") + writer.writeRecord("one") + writer.writeRecord("two") + writer.writeRecord("three") + writer.flush() + writer.close() + + // The file produced by getWriter must be visible on disk under the + // configured folder URI (proves we wrote to the right place). + assert(Files.exists(path.resolve("records.bin"))) + + val records = storage.getReader("records.bin").mkRecordIterator().toList + assert(records == List("one", "two", "three")) + } finally { + cleanup(path) + } + } + + it should "produce an empty iterator when reading a file containing no records" in { + // An empty file (writer opened and closed without writing) must read + // back as zero records, not throw. + val (path, uri) = mkTempUri("empty-file") + try { + val storage = new VFSRecordStorage[String](uri) + val writer = storage.getWriter("empty.bin") + writer.flush() + writer.close() + assert(Files.exists(path.resolve("empty.bin"))) + val records = storage.getReader("empty.bin").mkRecordIterator().toList + assert(records.isEmpty) + } finally { + cleanup(path) + } + } + + it should "support multiple distinct files under the same storage folder" in { + // Two writers under the same VFSRecordStorage instance must produce + // independent files — no cross-pollination. + val (path, uri) = mkTempUri("two-files") + try { + val storage = new VFSRecordStorage[String](uri) + val w1 = storage.getWriter("a.bin") + w1.writeRecord("from-a") + w1.flush(); w1.close() + val w2 = storage.getWriter("b.bin") + w2.writeRecord("from-b") + w2.flush(); w2.close() + assert(storage.getReader("a.bin").mkRecordIterator().toList == List("from-a")) + assert(storage.getReader("b.bin").mkRecordIterator().toList == List("from-b")) + } finally { + cleanup(path) + } + } + + // --------------------------------------------------------------------------- + // deleteStorage + // --------------------------------------------------------------------------- + + "VFSRecordStorage.deleteStorage" should + "remove the folder created by the constructor along with its contents" in { + val (path, uri) = mkTempUri("delete") + try { + val storage = new VFSRecordStorage[String](uri) + val writer = storage.getWriter("data.bin") + writer.writeRecord("payload") + writer.flush(); writer.close() + assert(Files.exists(path.resolve("data.bin"))) + + storage.deleteStorage() + assert(!Files.exists(path), "deleteStorage should remove the storage folder") + } finally { + cleanup(path) + } + } + + // --------------------------------------------------------------------------- + // containsFolder + // --------------------------------------------------------------------------- + + "VFSRecordStorage.containsFolder" should "return true for an existing sub-folder" in { + val (path, uri) = mkTempUri("contains-folder") + try { + val storage = new VFSRecordStorage[String](uri) + // Create the sub-folder AFTER the storage is constructed so the + // test pins the live-lookup behavior of containsFolder (not a + // value cached at construction time). + Files.createDirectory(path.resolve("nested")) + assert(storage.containsFolder("nested")) + } finally { + cleanup(path) + } + } + + it should "return false for a missing child entry" in { + val (path, uri) = mkTempUri("contains-missing") + try { + val storage = new VFSRecordStorage[String](uri) + assert(!storage.containsFolder("does-not-exist")) + } finally { + cleanup(path) + } + } + + it should "return false when the child entry exists but is a file (not a folder)" in { + // The `containsFolder` contract is "exists AND isFolder". A plain + // file with the requested name must NOT register as a folder. + val (path, uri) = mkTempUri("contains-file") + try { + val storage = new VFSRecordStorage[String](uri) + Files.writeString(path.resolve("looks-like-folder"), "i am a file") + assert(!storage.containsFolder("looks-like-folder")) + } finally { + cleanup(path) + } + } +}
